diff --git a/docs-next/content/docs/guides/reliability/circuit-breakers.mdx b/docs-next/content/docs/guides/reliability/circuit-breakers.mdx new file mode 100644 index 0000000..7991cef --- /dev/null +++ b/docs-next/content/docs/guides/reliability/circuit-breakers.mdx @@ -0,0 +1,154 @@ +--- +title: Circuit Breakers +description: "Sample-based half-open recovery — protect downstream services from cascading failures." +--- + +Circuit breakers prevent cascading failures by temporarily stopping task +execution when a task fails repeatedly. This is especially useful for tasks +that call external APIs or services. + +## How it works + +A circuit breaker tracks failures within a time window and transitions +through three states: + + Closed + Closed --> Open: failures >= threshold within window + Open --> HalfOpen: cooldown elapsed + HalfOpen --> Closed: success rate >= threshold + HalfOpen --> Open: success rate impossible OR timeout`} +/> + +- **Closed** — normal operation. Tasks execute as usual. Failures are counted. +- **Open** — too many failures. Tasks are immediately rejected without execution. +- **Half-Open** — after the cooldown period, up to N probe requests are + allowed through (default 5). Success and failure counts are tracked. The + circuit closes when the success rate meets the threshold (default 80%). If + too many probes fail and the threshold becomes mathematically impossible, + the circuit immediately re-opens. If probes don't complete within the + cooldown period, the circuit re-opens as a safety valve. + +## Configuration + +Enable circuit breakers per task using the `circuit_breaker` parameter: + +```python +@queue.task( + circuit_breaker={ + "threshold": 5, # Open after 5 failures + "window": 60, # Within a 60-second window + "cooldown": 300, # Stay open for 5 minutes before half-open + "half_open_probes": 5, # Allow 5 probe requests in half-open + "half_open_success_rate": 0.8, # Close when 80% of probes succeed + } +) +def call_external_api(endpoint: str) -> dict: + return requests.get(endpoint).json() +``` + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `threshold` | `int` | `5` | Number of failures to trigger the breaker | +| `window` | `int` | `60` | Time window in seconds for counting failures | +| `cooldown` | `int` | `300` | Seconds to wait before allowing a test request | +| `half_open_probes` | `int` | `5` | Number of probe requests allowed in half-open | +| `half_open_success_rate` | `float` | `0.8` | Required success rate (0.0–1.0) to close from half-open | + +## Inspecting circuit breaker state + +### Python API + +```python +breakers = queue.circuit_breakers() +for cb in breakers: + print(f"{cb['task_name']}: {cb['state']} (failures: {cb['failure_count']})") +``` + +### Dashboard API + +```bash +curl http://localhost:8080/api/circuit-breakers +``` + +```json +[ + { + "task_name": "myapp.tasks.call_external_api", + "state": "open", + "failure_count": 5, + "last_failure": 1700000010000, + "cooldown_until": 1700000310000 + } +] +``` + +## When to use + +Circuit breakers are most useful for tasks that interact with external systems: + +- **External API calls** — prevent hammering a down service +- **Database connections** — stop retrying when the database is unreachable +- **Third-party services** — email providers, payment gateways, etc. + +For purely internal computation tasks, circuit breakers are usually +unnecessary — standard retries with backoff are sufficient. + +## Combining with retries + +Circuit breakers and retries work together. A task with both will: + +1. Retry on failure up to `max_retries` times (with backoff) +2. Count each final failure toward the circuit breaker threshold +3. Once the breaker opens, new jobs for that task are rejected immediately + +```python +@queue.task( + max_retries=3, + retry_backoff=2.0, + circuit_breaker={"threshold": 5, "window": 120, "cooldown": 600}, +) +def send_email(to: str, subject: str, body: str): + smtp.send(to, subject, body) +``` + +## Examples + +### External payment API + +```python +@queue.task( + max_retries=3, + circuit_breaker={"threshold": 3, "window": 60, "cooldown": 120}, +) +def charge_customer(customer_id: str, amount: float): + response = requests.post( + "https://api.payment-provider.com/charge", + json={"customer": customer_id, "amount": amount}, + timeout=10, + ) + response.raise_for_status() + return response.json() +``` + +If the payment API goes down, the circuit breaker opens after 3 failures +within 60 seconds, preventing a flood of requests to the failing service. +After 2 minutes, up to 5 probe requests are allowed through. If at least 80% +succeed (4 out of 5), the circuit closes. + +### Health check with monitoring + +```python +from taskito.events import EventType + +# Log when circuit breakers change state +def monitor_breakers(event_type: EventType, payload: dict): + breakers = queue.circuit_breakers() + open_breakers = [b for b in breakers if b["state"] == "open"] + if open_breakers: + names = ", ".join(b["task_name"] for b in open_breakers) + print(f"WARNING: Open circuit breakers: {names}") + +queue.on_event(EventType.JOB_FAILED, monitor_breakers) +``` diff --git a/docs-next/content/docs/guides/reliability/error-handling.mdx b/docs-next/content/docs/guides/reliability/error-handling.mdx new file mode 100644 index 0000000..f3d6710 --- /dev/null +++ b/docs-next/content/docs/guides/reliability/error-handling.mdx @@ -0,0 +1,368 @@ +--- +title: Error Handling & Troubleshooting +description: "Debug task failures, inspect error history, common failure modes, and the exception hierarchy." +--- + +import { Callout } from "fumadocs-ui/components/callout"; + +This guide covers how to debug task failures, inspect error history, handle +common problems, and understand what happens when things go wrong. + +## Task failure lifecycle + +When a task raises an exception: + +1. The error message is recorded in the `job_errors` table +2. If `retry_count < max_retries`, the job is rescheduled with exponential backoff +3. If all retries are exhausted, the job moves to the **dead letter queue** (status: `dead`) + + B["Error recorded"] + B --> C{"Retries left?"} + C -->|Yes| D["Reschedule with backoff"] + C -->|No| E["Move to DLQ"]`} +/> + +## Inspecting error history + +Every failed attempt is recorded. Use `job.errors` to see the full history: + +```python +job = unreliable_task.delay() + +# After the job fails and retries... +for error in job.errors: + print(f"Attempt {error['attempt']}: {error['error']} at {error['failed_at']}") +``` + +Each error entry: + +| Field | Type | Description | +|---|---|---| +| `id` | `str` | Unique error record ID | +| `job_id` | `str` | The job this error belongs to | +| `attempt` | `int` | Attempt number (0-indexed) | +| `error` | `str` | Error message string | +| `failed_at` | `int` | Timestamp in milliseconds | + +## Diagnosing dead letters + +When a job exhausts all retries, it lands in the DLQ. Inspect dead letters to +understand what went wrong: + +```python +dead = queue.dead_letters(limit=20) + +for d in dead: + print(f"Task: {d['task_name']}") + print(f"Error: {d['error']}") + print(f"Retries: {d['retry_count']}") + print(f"Queue: {d['queue']}") + print() +``` + +### Common patterns + +**Same error on every attempt** — The failure is deterministic (e.g., bad +arguments, missing dependency). Fix the root cause, then replay: + +```python +new_job_id = queue.retry_dead(dead[0]["id"]) +``` + +**Intermittent errors** — The failure is transient (e.g., network timeout). +Replaying will likely succeed: + +```python +# Replay all dead letters +for d in dead: + queue.retry_dead(d["id"]) +``` + + + Replayed jobs preserve the original job's `priority`, `max_retries`, + `timeout`, and `result_ttl` settings — no need to re-specify them. + + +**Error message mentions serialization** — see Serialization Errors below. + +## Common error scenarios + +### Serialization errors + +taskito uses `cloudpickle` to serialize task arguments and return values. +Serialization fails when: + +- **Passing unpicklable objects**: open file handles, database connections, sockets, thread locks +- **Module-level objects that can't be resolved**: dynamically generated classes without stable import paths +- **Very large objects**: while cloudpickle has no hard limit, extremely large payloads slow down SQLite writes + +**Fix**: pass simple, serializable data (strings, numbers, dicts, lists) as +task arguments. Reconstruct complex objects inside the task: + +```python +# Bad — passing a connection object +@queue.task() +def query(conn, sql): # conn can't be pickled + return conn.execute(sql) + +# Good — pass connection info, create inside the task +@queue.task() +def query(db_url, sql): + conn = create_connection(db_url) + return conn.execute(sql) +``` + +### Task not found in registry + +``` +KeyError: "Task 'myapp.process_data' not found in registry" +``` + +This means the worker doesn't have the task registered. Causes: + +- The module defining the task wasn't imported before starting the worker +- The task name changed (function renamed or moved to a different module) +- The `--app` flag points to the wrong module + +**Fix**: ensure all task modules are imported when your `Queue` instance is +created. Typically, define tasks in the same module as the queue, or import +them in your app's `__init__.py`. + +### SQLite lock contention + +``` +OperationalError: database is locked +``` + +SQLite allows concurrent reads (WAL mode), but only one writer at a time. +taskito sets `busy_timeout=5000ms` to wait for locks, but heavy write loads +can still cause contention. + +**Causes:** + +- Multiple processes writing to the same SQLite file simultaneously +- Very large batch inserts blocking the writer +- Long-running transactions from external tools (e.g., DB browser) holding locks + +**Fixes:** + +- Avoid opening the database file with other tools while the worker is running +- Use `enqueue_many()` / `task.map()` for batch inserts — they use a single transaction +- If running multiple processes, ensure only one worker process writes at a time + +### Timeout reaping + +``` +Task timed out after 10s +``` + +If a task exceeds its `timeout`, the scheduler detects it (every ~5 seconds) +and marks it as failed, triggering retry/DLQ logic. + +```python +@queue.task(timeout=30) # 30 second timeout +def long_task(): + ... +``` + + + Timeout reaping marks the job as failed, but **does not kill the Python + thread**. The task function continues running until it finishes — the + result is simply discarded. For CPU-bound tasks that might hang, consider + adding your own internal timeout logic. + + +### Soft timeouts + +While hard timeouts (above) are enforced by the scheduler, **soft timeouts** +let the task itself react to time pressure: + +```python +from taskito import current_job + +@queue.task(soft_timeout=30) +def long_task(): + for chunk in data_chunks: + process(chunk) + current_job.check_timeout() # raises SoftTimeoutError after 30s +``` + +| Timeout type | Mechanism | Exception | +|---|---|---| +| Hard timeout (`timeout`) | Scheduler reaps the job externally | `TaskTimeoutError` (internal) | +| Soft timeout (`soft_timeout`) | Task checks elapsed time via `check_timeout()` | `SoftTimeoutError` | + +Soft timeouts are **cooperative** — the task must call `check_timeout()` at +safe points. + +### Worker crash behavior + +If the worker process crashes (kill -9, OOM, power loss): + +- **Running jobs** stay in `running` status in SQLite +- On the next worker start, the scheduler's **stale job reaper** detects jobs + that have been `running` longer than their timeout and marks them as failed +- If no timeout is set, stale jobs with no timeout remain in `running` status + until manually cleaned up + +**Recommendation**: always set a `timeout` on tasks so that stale jobs are +automatically recovered: + +```python +@queue.task(timeout=300) # 5 minute timeout +def process(data): + ... +``` + +## Debugging tips + +### Check queue stats + +Quick health check: + +```python +stats = queue.stats() +print(stats) +# {'pending': 0, 'running': 0, 'completed': 450, 'failed': 2, 'dead': 1, 'cancelled': 0} +``` + +Or via CLI: + +```bash +taskito info --app myapp:queue --watch +``` + +### Use hooks for alerting + +Set up failure hooks to get notified immediately: + +```python +@queue.on_failure +def on_task_failure(task_name, args, kwargs, error): + print(f"FAILED: {task_name} — {error}") + # Send to Slack, PagerDuty, etc. +``` + +### Test mode for isolation + +Use [test mode](/docs/guides/operations) to run tasks synchronously and inspect +errors without a worker: + +```python +with queue.test_mode() as results: + risky_task.delay() + + if results[0].failed: + print(results[0].error) + print(results[0].traceback) +``` + +### Inspect the SQLite database directly + +For deep debugging, query the SQLite database: + +```bash +sqlite3 myapp.db "SELECT id, task_name, status, error FROM jobs WHERE status = 3 LIMIT 10;" +``` + +Status codes: 0=pending, 1=running, 2=complete, 3=failed, 4=dead, 5=cancelled. + +## Error handling patterns + +### Exception filtering + +Use `retry_on` and `dont_retry_on` to control which exceptions trigger retries: + +```python +@queue.task( + max_retries=5, + retry_on=[ConnectionError, TimeoutError], + dont_retry_on=[ValueError], +) +def call_api(url): + resp = requests.get(url, timeout=10) + resp.raise_for_status() + return resp.json() +``` + +See [Retries — Exception Filtering](/docs/guides/reliability/retries) for details. + +### Cancelling running tasks + +Cancel a job that is already executing: + +```python +# Request cancellation +queue.cancel_running_job(job_id) +``` + +Inside the task, check for cancellation at safe points: + +```python +from taskito import current_job + +@queue.task() +def long_task(items): + for item in items: + process(item) + current_job.check_cancelled() # raises TaskCancelledError +``` + +`check_cancelled()` raises `TaskCancelledError` if cancellation was requested. +Place these checks at natural breakpoints in long-running tasks. + + + Cancellation is **cooperative** — the task must call `check_cancelled()` to + observe it. If the task never checks, it runs to completion. + + +### Cleanup on failure + +Use try/finally or hooks to clean up resources: + +```python +@queue.task() +def process_file(path): + tmp = download_to_temp(path) + try: + return parse(tmp) + finally: + os.unlink(tmp) +``` + +## Exception hierarchy + +taskito defines a hierarchy of exceptions for precise error handling: + +``` +TaskitoError (base) +├── TaskTimeoutError — hard timeout exceeded +├── SoftTimeoutError — soft timeout exceeded (check_timeout) +├── TaskCancelledError — task cancelled (check_cancelled) +├── MaxRetriesExceededError — all retry attempts exhausted +├── SerializationError — serialization/deserialization failure +├── CircuitBreakerOpenError — circuit breaker is open +├── RateLimitExceededError — rate limit exceeded +├── JobNotFoundError — job ID not found (also a KeyError) +└── QueueError — queue-level operational error +``` + +All exceptions inherit from `TaskitoError`, so you can catch the base class +for broad handling: + +```python +from taskito import TaskitoError, SoftTimeoutError, TaskCancelledError + +try: + result = job.result(timeout=30) +except TaskitoError as e: + print(f"Taskito error: {e}") +``` + +Import any exception directly from the `taskito` package: + +```python +from taskito import TaskCancelledError, SoftTimeoutError, SerializationError +``` diff --git a/docs-next/content/docs/guides/reliability/guarantees.mdx b/docs-next/content/docs/guides/reliability/guarantees.mdx new file mode 100644 index 0000000..9a187f7 --- /dev/null +++ b/docs-next/content/docs/guides/reliability/guarantees.mdx @@ -0,0 +1,147 @@ +--- +title: Delivery Guarantees +description: "At-least-once delivery, claim_execution, idempotency patterns, deduplication." +--- + +Taskito provides **at-least-once delivery**. Every enqueued job will be +executed at least once, but may be executed more than once if a worker +crashes mid-execution. + +## What this means + +- A job **will not be lost** — if a worker dies, the scheduler detects the stale job and retries it +- A job **may run twice** — if a worker crashes after starting but before marking the job complete +- A job **will not run concurrently** — `claim_execution` prevents two workers from picking up the same job + +## Why not exactly-once? + +Exactly-once delivery is impossible in distributed systems without two-phase +commit. Taskito's approach matches Celery, SQS, and most production job +systems: deliver at least once, design tasks to handle duplicates. + +## How recovery works + +>DB: dequeue + claim_execution + S->>W: dispatch job + W->>W: execute task + Note over W: Worker crashes here + Note over S: timeout_ms elapses... + S->>DB: reap_stale_jobs detects stuck job + S->>DB: mark failed + schedule retry + S->>W: dispatch again (new attempt) + W->>DB: complete + clear claim`} +/> + +The `claim_execution` mechanism prevents two workers from executing the same +job simultaneously. But it cannot prevent re-execution after a crash — the +claim is cleared when the stale reaper detects the timeout. + +## Writing idempotent tasks + +Since tasks may run more than once, design them to be safe on re-execution: + +### Use database upserts + +```python +@queue.task() +def create_user(email, name): + # UPSERT — safe to run twice + db.execute( + "INSERT INTO users (email, name) VALUES (?, ?) " + "ON CONFLICT (email) DO UPDATE SET name = ?", + (email, name, name), + ) +``` + +### Use idempotency keys + +```python +@queue.task() +def charge_customer(order_id, amount): + # Check if already charged + if db.execute("SELECT 1 FROM charges WHERE order_id = ?", (order_id,)).fetchone(): + return # Already processed + + payment_provider.charge(amount, idempotency_key=f"order-{order_id}") + db.execute("INSERT INTO charges (order_id, amount) VALUES (?, ?)", (order_id, amount)) +``` + +### Use unique tasks for deduplication + +```python +# Only one pending/running instance per key +job = send_report.apply_async( + args=(user_id,), + unique_key=f"report-{user_id}", +) +``` + +If a job with the same `unique_key` is already pending or running, the +duplicate is silently dropped. See +[Unique Tasks](/docs/guides/advanced-execution) for details. + +### Avoid side effects that can't be undone + +```python +# Bad — sends duplicate emails on retry +@queue.task() +def notify(user_id): + send_email(user_id, "Your order shipped") + +# Good — check before sending +@queue.task() +def notify(user_id): + if not db.execute("SELECT notified FROM orders WHERE user_id = ?", (user_id,)).fetchone()[0]: + send_email(user_id, "Your order shipped") + db.execute("UPDATE orders SET notified = 1 WHERE user_id = ?", (user_id,)) +``` + +## Deduplication window + +`unique_key` prevents duplicate enqueue only while a job with that key is +**pending or running**. Once the job completes (or is dead-lettered/cancelled), +the same `unique_key` can be enqueued again. + +```python +job1 = task.apply_async(args=(1,), unique_key="order-123") # Enqueued +job2 = task.apply_async(args=(1,), unique_key="order-123") # Skipped (job1 pending) +# ... job1 completes ... +job3 = task.apply_async(args=(1,), unique_key="order-123") # Enqueued (new job) +``` + +## How claim execution works + +Before dispatching a job to a worker thread, the scheduler calls +`claim_execution(job_id, worker_id)`. This is an atomic `SET NX` (SQLite: +`INSERT OR IGNORE`, Postgres: `INSERT ... ON CONFLICT DO NOTHING`, Redis: +`SET NX`). If another scheduler instance already claimed the job, the claim +fails and the job is skipped. + +This prevents **duplicate dispatch** (two workers picking up the same job). +It does NOT prevent **duplicate execution** after a crash — the claim is +cleared by the stale reaper when it detects the timeout. + +## Framework vs task responsibility + +| Concern | Who handles it | +|---------|---------------| +| Job dispatch deduplication | Framework (`claim_execution`) | +| Job enqueue deduplication | Framework (`unique_key`) | +| Crash recovery | Framework (stale reaper) | +| Idempotent execution | **You** (task code) | +| Side-effect safety | **You** (task code) | + +## Summary + +| Guarantee | Taskito | Celery | SQS | +|-----------|---------|--------|-----| +| Delivery | At-least-once | At-least-once | At-least-once | +| Duplicate prevention | `claim_execution` (dispatch-level) | Visibility timeout | Visibility timeout | +| Deduplication | `unique_key` (enqueue-level) | Manual | Message dedup ID | +| Crash recovery | Stale reaper (timeout-based) | Worker ack timeout | Visibility timeout | diff --git a/docs-next/content/docs/guides/reliability/index.mdx b/docs-next/content/docs/guides/reliability/index.mdx index 760825c..d14893e 100644 --- a/docs-next/content/docs/guides/reliability/index.mdx +++ b/docs-next/content/docs/guides/reliability/index.mdx @@ -1,10 +1,15 @@ --- title: Reliability -description: "Retries, timeouts, dead-lettering, circuit breakers." +description: "Harden your task queue for production workloads." --- -import { Callout } from 'fumadocs-ui/components/callout'; +Harden your task queue for production workloads. - - Content port pending. See the [Zensical source](https://github.com/ByteVeda/taskito/tree/master/docs) for current text. - +| Guide | Description | +|---|---| +| [Retries & Dead Letters](/docs/guides/reliability/retries) | Automatic retries with exponential backoff, dead letter queue | +| [Error Handling](/docs/guides/reliability/error-handling) | Task failure lifecycle, error inspection, debugging patterns | +| [Delivery Guarantees](/docs/guides/reliability/guarantees) | At-least-once delivery, idempotency, and exactly-once patterns | +| [Rate Limiting](/docs/guides/reliability/rate-limiting) | Throttle task execution with token bucket rate limits | +| [Circuit Breakers](/docs/guides/reliability/circuit-breakers) | Protect downstream services from cascading failures | +| [Distributed Locking](/docs/guides/reliability/locking) | Mutual exclusion across workers with database-backed locks | diff --git a/docs-next/content/docs/guides/reliability/locking.mdx b/docs-next/content/docs/guides/reliability/locking.mdx new file mode 100644 index 0000000..1c4a329 --- /dev/null +++ b/docs-next/content/docs/guides/reliability/locking.mdx @@ -0,0 +1,148 @@ +--- +title: Distributed Locking +description: "Database-backed mutual-exclusion across workers — sync, async, auto-extend." +--- + +import { Callout } from "fumadocs-ui/components/callout"; + +taskito provides a distributed lock primitive backed by the same database +used for the task queue. Locks work across multiple worker processes and +machines sharing the same database. + +## Overview + +Use distributed locks when multiple workers or processes must not execute +the same critical section at the same time — for example, refreshing a +shared cache, running a singleton periodic task, or accessing an external +API with a single-writer constraint. + +## Sync context manager + +```python +with queue.lock("cache-refresh"): + refresh_cache() +``` + +The lock is automatically released when the `with` block exits, even if an +exception is raised. + +### Parameters + +```python +queue.lock( + name: str, + ttl: int = 30, + auto_extend: bool = True, + owner_id: str | None = None, + timeout: float | None = None, + retry_interval: float = 0.1, +) +``` + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `name` | `str` | — | Lock name. All processes using the same name compete for the same lock. | +| `ttl` | `int` | `30` | Lock TTL in seconds. Auto-extended if `auto_extend=True`. | +| `auto_extend` | `bool` | `True` | Automatically extend the lock before it expires (background thread). | +| `owner_id` | `str \| None` | `None` | Custom owner identifier. Defaults to a random UUID per acquisition. | +| `timeout` | `float \| None` | `None` | Max seconds to wait for the lock. `None` raises immediately if unavailable. | +| `retry_interval` | `float` | `0.1` | Seconds between retry attempts when waiting for the lock. | + +## Async context manager + +```python +async with queue.alock("cache-refresh"): + await refresh_cache() +``` + +`alock()` accepts the same parameters as `lock()` and is safe to use inside +async functions and FastAPI/Django async views. + +## Auto-extension + +When `auto_extend=True` (the default), a background thread extends the lock's +TTL at roughly half the TTL interval. This prevents the lock from expiring +during a long-running operation without requiring you to set an artificially +large TTL. + +```python +# This lock will stay alive for as long as the block runs, +# even if it takes several minutes. +with queue.lock("long-job", ttl=30, auto_extend=True): + run_slow_operation() +``` + +## Acquisition timeout + +By default, `lock()` raises `LockNotAcquired` immediately if the lock is held +by another process. Pass `timeout` to wait: + +```python +try: + with queue.lock("resource", timeout=5.0): + do_work() +except LockNotAcquired: + print("Could not acquire lock within 5 seconds") +``` + +The lock is retried every `retry_interval` seconds until `timeout` is +exceeded. + +## Cross-process locking + +Because lock state is stored in the database, locks are effective across +multiple worker processes on the same machine or different machines sharing +the same database: + +```python +# Process A (machine 1) +with queue.lock("billing-run"): + run_billing() + +# Process B (machine 2) — will wait or raise while process A holds the lock +with queue.lock("billing-run"): + run_billing() +``` + + + On SQLite, cross-process locking works via WAL mode and exclusive + transactions. For multi-machine deployments, use the PostgreSQL backend + where `SELECT FOR UPDATE SKIP LOCKED` provides true distributed semantics. + + +## Error handling + +```python +from taskito import LockNotAcquired + +try: + with queue.lock("my-lock", timeout=2.0): + critical_section() +except LockNotAcquired: + # Another process holds the lock; handle gracefully + log.warning("Skipping — another process is running the critical section") +``` + +## Low-level API + +For advanced use cases, you can manage locks manually without the context manager: + +```python +# Acquire +lock_id = queue._inner.acquire_lock("my-lock", ttl=30) + +# Extend +queue._inner.extend_lock("my-lock", lock_id, ttl=30) + +# Inspect +info = queue._inner.get_lock_info("my-lock") +# {"name": "my-lock", "owner_id": "...", "expires_at": 1710000000} + +# Release +queue._inner.release_lock("my-lock", lock_id) +``` + + + The low-level API skips auto-extension and does not release on exception. + Prefer the context manager (`lock()` / `alock()`) for production code. + diff --git a/docs-next/content/docs/guides/reliability/meta.json b/docs-next/content/docs/guides/reliability/meta.json index d7966a6..f39c2d1 100644 --- a/docs-next/content/docs/guides/reliability/meta.json +++ b/docs-next/content/docs/guides/reliability/meta.json @@ -1,4 +1,12 @@ { "title": "Reliability", - "pages": ["index"] + "pages": [ + "index", + "retries", + "error-handling", + "guarantees", + "rate-limiting", + "circuit-breakers", + "locking" + ] } diff --git a/docs-next/content/docs/guides/reliability/rate-limiting.mdx b/docs-next/content/docs/guides/reliability/rate-limiting.mdx new file mode 100644 index 0000000..efe2a58 --- /dev/null +++ b/docs-next/content/docs/guides/reliability/rate-limiting.mdx @@ -0,0 +1,76 @@ +--- +title: Rate Limiting +description: "Token bucket rate limits per task — count/period syntax, persistence, retries." +--- + +import { Callout } from "fumadocs-ui/components/callout"; + +taskito uses a **token bucket** algorithm to limit how fast tasks execute. +Rate limits are per-task and persisted in SQLite. + +## Usage + +```python +@queue.task(rate_limit="100/m") # 100 per minute +def send_email(to, subject, body): + ... + +@queue.task(rate_limit="10/s") # 10 per second +def api_call(endpoint): + ... + +@queue.task(rate_limit="3600/h") # 3600 per hour +def generate_report(report_id): + ... +``` + +## Syntax + +Rate limits use the format `count/period`: + +| Format | Meaning | +|---|---| +| `"10/s"` | 10 per second | +| `"100/m"` | 100 per minute | +| `"3600/h"` | 3600 per hour | + +## How it works + +The token bucket algorithm: + +1. Each task name has a bucket with `max_tokens = count` and a `refill_rate = count / period` +2. Before dispatching a job, the scheduler checks if a token is available +3. If a token is available, it's consumed and the job is dispatched +4. If no tokens are available, the job is **rescheduled** 1 second in the future + + + Token bucket state (current tokens, last refill time) is stored in the + `rate_limits` SQLite table. This means rate limits survive worker restarts. + + +## Per-task, not per-queue + +Rate limits apply to the **task name**, regardless of which queue the job is in: + +```python +@queue.task(rate_limit="10/s", queue="emails") +def send_email(to, subject, body): + ... + +# Both of these are rate-limited together (same task name) +send_email.delay("alice@example.com", "Hi", "Body") +send_email.apply_async(args=("bob@example.com", "Hi", "Body"), queue="urgent") +``` + +## Combining with retries + +Rate limiting and retries work together seamlessly. If a rate-limited task +fails and retries, the retry attempt is also subject to the rate limit: + +```python +@queue.task(rate_limit="5/s", max_retries=3, retry_backoff=2.0) +def external_api(url): + response = requests.get(url) + response.raise_for_status() + return response.json() +``` diff --git a/docs-next/content/docs/guides/reliability/retries.mdx b/docs-next/content/docs/guides/reliability/retries.mdx new file mode 100644 index 0000000..4e09bbb --- /dev/null +++ b/docs-next/content/docs/guides/reliability/retries.mdx @@ -0,0 +1,173 @@ +--- +title: Retries & Dead Letters +description: "Automatic retries with exponential backoff, exception filtering, dead letter queue." +--- + +import { Callout } from "fumadocs-ui/components/callout"; + +taskito automatically retries failed tasks with exponential backoff and moves +permanently failed jobs to a dead letter queue. + +## Retry policy + +Configure retries at the task level: + +```python +@queue.task(max_retries=5, retry_backoff=2.0) +def flaky_api_call(url): + response = requests.get(url) + response.raise_for_status() + return response.json() +``` + +| Parameter | Default | Description | +|---|---|---| +| `max_retries` | `3` | Maximum retry attempts before DLQ | +| `retry_backoff` | `1.0` | Base delay in seconds for exponential backoff | + +### Backoff formula + +``` +delay = min(max_delay, base_delay * 2^retry_count) + jitter +``` + +- `base_delay` = `retry_backoff` (in seconds) +- `max_delay` = 300 seconds (5 minutes) +- `jitter` = random 0–500ms to prevent thundering herd + +**Example with `retry_backoff=2.0`:** + +| Attempt | Delay | +|---|---| +| 1st retry | ~2s | +| 2nd retry | ~4s | +| 3rd retry | ~8s | +| 4th retry | ~16s | +| 5th retry | ~32s | + +## Exception filtering + +Control which exceptions trigger retries with `retry_on` and `dont_retry_on`: + +```python +@queue.task( + max_retries=5, + retry_on=[ConnectionError, TimeoutError], + dont_retry_on=[ValueError], +) +def fetch_data(url): + response = requests.get(url) + response.raise_for_status() + return response.json() +``` + +| Parameter | Description | +|---|---| +| `retry_on` | Whitelist — only retry on these exception types. All others skip straight to DLQ. | +| `dont_retry_on` | Blacklist — never retry on these exception types, even if retries remain. | + +If neither is set, all exceptions trigger retries (default behavior). + + + `retry_on` and `dont_retry_on` are mutually exclusive in practice — if + `retry_on` is set, only those exceptions are retried regardless of + `dont_retry_on`. + + +## Retry flow + + B{Success?} + B -->|Yes| C["Status: Complete
Store result"] + B -->|No| D["Record error in
job_errors table"] + D --> SR{"Exception passes
retry_on / dont_retry_on?"} + SR -->|No| I["Move to Dead Letter Queue
Status: Dead"] + SR -->|Yes| E{"retry_count < max_retries?"} + E -->|Yes| F["Calculate backoff delay"] + F --> G["Status: Pending
retry_count += 1"] + G --> H["Wait for scheduled time"] + H --> A + E -->|No| I`} +/> + +## Dead letter queue + +Jobs that exhaust all retries are moved to the DLQ for inspection and manual replay. + +### Inspect dead letters + +```python +# List the 10 most recent dead letters +dead = queue.dead_letters(limit=10, offset=0) + +for d in dead: + print(f"Job: {d['original_job_id']}") + print(f"Task: {d['task_name']}") + print(f"Error: {d['error']}") + print(f"Retries: {d['retry_count']}") + print() +``` + +### Replay dead letters + +```python +# Re-enqueue a dead letter job (creates a new job) +new_job_id = queue.retry_dead(dead[0]["id"]) +``` + + + Replayed jobs preserve the original job's `priority`, `max_retries`, + `timeout`, and `result_ttl` settings. You don't need to re-specify them — the + DLQ stores the full configuration. + + +### Purge old dead letters + +```python +# Delete dead letters older than 24 hours +deleted = queue.purge_dead(older_than=86400) +print(f"Purged {deleted} dead letter(s)") +``` + +## Error history + +Every failed attempt is recorded with the error message. Access the full +history via `job.errors`: + +```python +@queue.task(max_retries=3) +def unreliable(): + raise ConnectionError("timeout") + +job = unreliable.delay() + +# After the job fails and retries... +for error in job.errors: + print(f"Attempt {error['attempt']}: {error['error']}") + # Attempt 0: timeout + # Attempt 1: timeout + # Attempt 2: timeout +``` + +Each error entry contains: + +| Field | Type | Description | +|---|---|---| +| `id` | `str` | Unique error record ID | +| `job_id` | `str` | The job this error belongs to | +| `attempt` | `int` | Attempt number (0-indexed) | +| `error` | `str` | Error message | +| `failed_at` | `int` | Timestamp in milliseconds | + +## Timeout reaping + +If a task exceeds its `timeout`, the scheduler automatically detects it +(checking every ~5 seconds) and treats it as a failure — triggering the +retry/DLQ logic. + +```python +@queue.task(timeout=10) # 10 second timeout +def slow_task(): + time.sleep(60) # Will be reaped after 10s +```