diff --git a/docs-next/content/docs/guides/extensibility/events-webhooks.mdx b/docs-next/content/docs/guides/extensibility/events-webhooks.mdx new file mode 100644 index 0000000..03d13d7 --- /dev/null +++ b/docs-next/content/docs/guides/extensibility/events-webhooks.mdx @@ -0,0 +1,285 @@ +--- +title: Events & Webhooks +description: "In-process event bus and HMAC-signed HTTP webhooks for job and worker lifecycle events." +--- + +taskito includes an in-process event bus and webhook delivery system for +reacting to job lifecycle events. + +## Event types + +The `EventType` enum defines all available lifecycle events: + +| Event | Fired when | Payload fields | +|-------|------------|----------------| +| `JOB_ENQUEUED` | A job is added to the queue | `job_id`, `task_name`, `queue` | +| `JOB_COMPLETED` | A job finishes successfully | `job_id`, `task_name`, `queue` | +| `JOB_FAILED` | A job raises an exception (before retry) | `job_id`, `task_name`, `queue`, `error` | +| `JOB_RETRYING` | A failed job will be retried | `job_id`, `task_name`, `error`, `retry_count` | +| `JOB_DEAD` | A job exhausts all retries and enters the DLQ | `job_id`, `task_name`, `error` | +| `JOB_CANCELLED` | A job is cancelled | `job_id`, `task_name` | +| `WORKER_STARTED` | A worker process/thread comes online | `worker_id`, `hostname` | +| `WORKER_STOPPED` | A worker process/thread shuts down | `worker_id`, `hostname` | +| `WORKER_ONLINE` | Worker registered in storage (visible to fleet) | `worker_id`, `queues`, `pool` | +| `WORKER_OFFLINE` | Dead worker reaped (no heartbeat for 30s) | `worker_id` | +| `WORKER_UNHEALTHY` | Resource health transitions to unhealthy | `worker_id`, `resources` | +| `QUEUE_PAUSED` | A named queue is paused | `queue` | +| `QUEUE_RESUMED` | A paused queue is resumed | `queue` | + +`JOB_RETRYING`, `JOB_DEAD`, and `JOB_CANCELLED` are emitted by the Rust +result handler immediately after the scheduler records the outcome. +Middleware hooks (`on_retry`, `on_dead_letter`, `on_cancel`) are called in +the same result-handling pass, after the event fires. + +`QUEUE_PAUSED` and `QUEUE_RESUMED` are emitted synchronously by +`queue.pause()` and `queue.resume()` after the queue state is written to +storage. + +## Registering listeners + +Use `queue.on_event()` to subscribe a callback to a specific event type: + +```python +from taskito import Queue +from taskito.events import EventType + +queue = Queue(db_path="tasks.db") + +def on_failure(event_type: EventType, payload: dict): + print(f"Job {payload['job_id']} failed: {payload.get('error')}") + +queue.on_event(EventType.JOB_FAILED, on_failure) +``` + +### Callback signature + +All callbacks receive two arguments: + +- `event_type` (`EventType`) — the event that occurred +- `payload` (`dict`) — event details including `job_id`, `task_name`, `queue`, and event-specific fields + +### Async delivery + +Callbacks are dispatched asynchronously in a `ThreadPoolExecutor`. The +thread pool size defaults to 4 and can be configured via +`Queue(event_workers=N)`. This means: + +- Callbacks never block the worker +- Exceptions in callbacks are logged but do not affect job processing +- Callbacks may execute slightly after the event occurs + +## Webhooks + +For external systems, register webhook URLs to receive HTTP POST requests +on job events. + +### Registering a webhook + +```python +queue.add_webhook( + url="https://example.com/hooks/taskito", + events=[EventType.JOB_FAILED, EventType.JOB_DEAD], + headers={"Authorization": "Bearer mytoken"}, + secret="my-signing-secret", +) +``` + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `url` | `str` | — | URL to POST event payloads to (must be `http://` or `https://`) | +| `events` | `list[EventType] \| None` | `None` | Event types to subscribe to. `None` means all events | +| `headers` | `dict[str, str] \| None` | `None` | Extra HTTP headers to include in requests | +| `secret` | `str \| None` | `None` | HMAC-SHA256 signing secret | +| `max_retries` | `int` | `3` | Maximum delivery attempts | +| `timeout` | `float` | `10.0` | HTTP request timeout in seconds | +| `retry_backoff` | `float` | `2.0` | Base for exponential backoff between retries | + +### HMAC signing + +When a `secret` is provided, each webhook request includes an +`X-Taskito-Signature` header: + +``` +X-Taskito-Signature: sha256= +``` + +The signature is computed over the JSON request body using HMAC-SHA256. +Verify it on the receiving end: + +```python +import hashlib +import hmac + +def verify_signature(body: bytes, signature: str, secret: str) -> bool: + expected = hmac.new(secret.encode(), body, hashlib.sha256).hexdigest() + return hmac.compare_digest(f"sha256={expected}", signature) +``` + +### Retry behavior + +Failed webhook deliveries are retried with exponential backoff. The number +of attempts, request timeout, and backoff base are configurable per webhook +via `max_retries`, `timeout`, and `retry_backoff`. With the defaults +(`max_retries=3`, `retry_backoff=2.0`): + +| Attempt | Delay before next retry | +|---------|------------------------| +| 1st retry | 1 second (`2.0 ** 0`) | +| 2nd retry | 2 seconds (`2.0 ** 1`) | +| 3rd retry | — (final) | + +4xx responses are not retried. If all attempts fail, a warning is logged +and the event is dropped. + +### Event filtering + +Subscribe to specific events or all events: + +```python +# Only failure events +queue.add_webhook( + url="https://slack.example.com/webhook", + events=[EventType.JOB_FAILED, EventType.JOB_DEAD], +) + +# All events +queue.add_webhook(url="https://monitoring.example.com/events") +``` + +## Examples + +### Slack notification on job failure + +```python +import requests +from taskito.events import EventType + +def notify_slack(event_type: EventType, payload: dict): + requests.post( + "https://hooks.slack.com/services/T.../B.../xxx", + json={ + "text": f":x: Task `{payload['task_name']}` failed\n" + f"Job ID: `{payload['job_id']}`\n" + f"Error: {payload.get('error', 'unknown')}" + }, + ) + +queue.on_event(EventType.JOB_FAILED, notify_slack) +queue.on_event(EventType.JOB_DEAD, notify_slack) +``` + +### Webhook to external monitoring + +```python +queue.add_webhook( + url="https://monitoring.example.com/api/taskito-events", + events=[EventType.JOB_COMPLETED, EventType.JOB_FAILED, EventType.JOB_DEAD], + secret="whsec_abc123", + headers={"X-Source": "taskito-prod"}, +) +``` + +The monitoring service receives JSON payloads like: + +```json +{ + "event": "job.failed", + "job_id": "01H5K6X...", + "task_name": "myapp.tasks.process", + "queue": "default", + "error": "ConnectionError: ..." +} +``` + +### Job completion tracking + +```python +from taskito.events import EventType + +completed_count = 0 + +def track_completion(event_type: EventType, payload: dict): + global completed_count + completed_count += 1 + if completed_count % 100 == 0: + print(f"Milestone: {completed_count} jobs completed") + +queue.on_event(EventType.JOB_COMPLETED, track_completion) +``` + +### Database logging for audit trail + +```python +from taskito.events import EventType + +def audit_log(event_type: EventType, payload: dict): + db.execute( + "INSERT INTO audit_log (event, job_id, task_name, timestamp) VALUES (?, ?, ?, ?)", + (event_type.value, payload["job_id"], payload["task_name"], time.time()), + ) + +# Subscribe to all important events +for event in [EventType.JOB_ENQUEUED, EventType.JOB_COMPLETED, EventType.JOB_FAILED, EventType.JOB_DEAD]: + queue.on_event(event, audit_log) +``` + +## Event ordering + +Events fire in the order the scheduler processes results — typically the +order jobs complete. For jobs that complete nearly simultaneously, ordering +is **not guaranteed** across different workers or threads. + +Within a single job's lifecycle, events always fire in this order: + +1. `JOB_ENQUEUED` (at enqueue time) +2. `JOB_COMPLETED` / `JOB_FAILED` / `JOB_CANCELLED` (at completion) +3. `JOB_RETRYING` (if retried, before the next attempt) +4. `JOB_DEAD` (if all retries exhausted) + +## Backpressure + +Events are dispatched to a thread pool (default size: 4, configurable via +`event_workers=N`). If callbacks are slow and events arrive faster than +they can be processed, they queue in memory. + +For high-volume event scenarios: + +```python +queue = Queue(event_workers=16) # More threads for slow callbacks +``` + +If a callback raises an exception, it is logged and the event is dropped — +it does not retry or block other callbacks. + +## Webhook failure + +Webhooks retry with exponential backoff (up to `max_retries`). After all +retries are exhausted, the webhook delivery is **logged and dropped** — +there is no dead-letter queue for webhooks. Monitor webhook failures via +the `on_failure` callback or structured logging. + +### Webhook receiver (Flask) + +A minimal Flask app that receives and verifies taskito webhooks: + +```python +from flask import Flask, request, abort +import hashlib, hmac + +app = Flask(__name__) +WEBHOOK_SECRET = "my-signing-secret" + +@app.route("/hooks/taskito", methods=["POST"]) +def receive_webhook(): + signature = request.headers.get("X-Taskito-Signature", "") + expected = hmac.new( + WEBHOOK_SECRET.encode(), request.data, hashlib.sha256 + ).hexdigest() + + if not hmac.compare_digest(f"sha256={expected}", signature): + abort(401) + + event = request.json + print(f"Received event: {event['event']} for job {event['job_id']}") + return "", 204 +``` diff --git a/docs-next/content/docs/guides/extensibility/index.mdx b/docs-next/content/docs/guides/extensibility/index.mdx index 52dd06b..1950567 100644 --- a/docs-next/content/docs/guides/extensibility/index.mdx +++ b/docs-next/content/docs/guides/extensibility/index.mdx @@ -1,10 +1,12 @@ --- title: Extensibility -description: "Middleware, custom serializers, interception." +description: "Extend taskito at every stage of the task lifecycle." --- -import { Callout } from 'fumadocs-ui/components/callout'; +Extend taskito with custom behavior at every stage of the task lifecycle. - - Content port pending. See the [Zensical source](https://github.com/ByteVeda/taskito/tree/master/docs) for current text. - +| Guide | Description | +|---|---| +| [Middleware](/docs/guides/extensibility/middleware) | Hook into task execution with `before`, `after`, `on_retry`, and more | +| [Serializers](/docs/guides/extensibility/serializers) | Custom payload serialization — msgpack, orjson, encryption | +| [Events & Webhooks](/docs/guides/extensibility/events-webhooks) | React to queue events and push notifications to external services | diff --git a/docs-next/content/docs/guides/extensibility/meta.json b/docs-next/content/docs/guides/extensibility/meta.json index e660d66..91b44a5 100644 --- a/docs-next/content/docs/guides/extensibility/meta.json +++ b/docs-next/content/docs/guides/extensibility/meta.json @@ -1,4 +1,4 @@ { "title": "Extensibility", - "pages": ["index"] + "pages": ["index", "middleware", "serializers", "events-webhooks"] } diff --git a/docs-next/content/docs/guides/extensibility/middleware.mdx b/docs-next/content/docs/guides/extensibility/middleware.mdx new file mode 100644 index 0000000..6ca78d1 --- /dev/null +++ b/docs-next/content/docs/guides/extensibility/middleware.mdx @@ -0,0 +1,225 @@ +--- +title: Per-Task Middleware +description: "TaskMiddleware base class with 7 lifecycle hooks — global, per-task, with mutable on_enqueue options." +--- + +import { Callout } from "fumadocs-ui/components/callout"; + +taskito supports a middleware system that lets you run code at key points in +the task lifecycle. Middleware can be applied globally (to all tasks) or +per-task. + +## TaskMiddleware base class + +Create middleware by subclassing `TaskMiddleware` and overriding the hooks +you need: + +```python +from taskito import TaskMiddleware + +class LoggingMiddleware(TaskMiddleware): + def before(self, ctx): + print(f"[START] {ctx.task_name} (job {ctx.id})") + + def after(self, ctx, result, error): + status = "OK" if error is None else f"FAILED: {error}" + print(f"[END] {ctx.task_name}: {status}") + + def on_retry(self, ctx, error, retry_count): + print(f"[RETRY] {ctx.task_name} attempt {retry_count}: {error}") +``` + +### Hook signatures + +| Hook | Called when | +|---|---| +| `before(ctx)` | Before task execution | +| `after(ctx, result, error)` | After task execution (success or failure) | +| `on_retry(ctx, error, retry_count)` | A job fails and will be retried | +| `on_enqueue(task_name, args, kwargs, options)` | A job is about to be enqueued | +| `on_dead_letter(ctx, error)` | A job exhausts all retries and moves to the DLQ | +| `on_timeout(ctx)` | A job hits its timeout limit | +| `on_cancel(ctx)` | A job is cancelled during execution | + +The `ctx` parameter is a `JobContext` — the same object as `current_job` — +providing `ctx.id`, `ctx.task_name`, `ctx.retry_count`, and `ctx.queue_name`. + + + `on_retry`, `on_dead_letter`, `on_timeout`, and `on_cancel` are called by + the Rust result handler after the scheduler records the outcome. They fire + after `after()` and after the corresponding event is emitted on the event + bus. Exceptions raised inside these hooks are logged and do not affect job + processing. + + +### `on_timeout` — handling timed-out jobs + +`on_timeout` fires when the Rust scheduler detects a stale job that +exceeded its hard `timeout`. Detection happens in the maintenance reaper, +which periodically scans for jobs still marked as running past their +deadline. + +When a job times out, `on_timeout` is called **before** `on_retry` (if the +job will be retried) or `on_dead_letter` (if retries are exhausted). This +lets you react to the timeout itself independently of whether the job will +be retried: + +```python +class TimeoutAlerter(TaskMiddleware): + def on_timeout(self, ctx): + # Fires for every timed-out job, regardless of retry/DLQ outcome + logger.error("Job %s (%s) timed out", ctx.id, ctx.task_name) + + def on_retry(self, ctx, error, retry_count): + # Fires after on_timeout when the job will be retried + logger.warning("Retrying %s (attempt %d)", ctx.task_name, retry_count) + + def on_dead_letter(self, ctx, error): + # Fires after on_timeout when retries are exhausted + logger.critical("Job %s exhausted retries after timeout", ctx.id) +``` + + + Use `on_timeout` to update dashboards, fire alerts, or record SLA + violations. Combine with `on_retry` and `on_dead_letter` for full + visibility into the job's fate after the timeout. + + +### `on_enqueue` — modifying enqueue parameters + +`on_enqueue` is unique: it fires before the job is written to the database, +and the `options` dict is **mutable**. Modify it to change how the job is +enqueued: + +```python +class PriorityBoostMiddleware(TaskMiddleware): + def on_enqueue(self, task_name, args, kwargs, options): + # Bump priority for urgent tasks during business hours + import datetime + hour = datetime.datetime.now().hour + if 9 <= hour < 18 and task_name.startswith("alerts."): + options["priority"] = max(options.get("priority", 0), 50) +``` + +Keys present in `options`: `priority`, `delay`, `queue`, `max_retries`, +`timeout`, `unique_key`, `metadata`. + +## Queue-level middleware + +Apply middleware to **all tasks** by passing it to the `Queue` constructor: + +```python +from taskito import Queue + +queue = Queue(middleware=[LoggingMiddleware()]) +``` + +## Per-task middleware + +Apply middleware to a **specific task** using the `middleware` parameter: + +```python +@queue.task(middleware=[MetricsMiddleware()]) +def process(data): + ... +``` + +Per-task middleware runs **after** global middleware, in registration order. + +## Example: metrics middleware + +```python +import time +from taskito import TaskMiddleware + +class MetricsMiddleware(TaskMiddleware): + def before(self, ctx): + ctx._start_time = time.monotonic() + + def after(self, ctx, result, error): + elapsed = time.monotonic() - ctx._start_time + status = "success" if error is None else "failure" + metrics.histogram("task.duration_seconds", elapsed, tags={ + "task": ctx.task_name, + "status": status, + }) +``` + +## Composition and ordering + +### Multiple middleware on the same task + +```python +import time +from taskito import TaskMiddleware + +class TimingMiddleware(TaskMiddleware): + def before(self, ctx): + ctx._start = time.monotonic() + def after(self, ctx, result, error): + elapsed = time.monotonic() - ctx._start + print(f"{ctx.task_name} took {elapsed:.3f}s") + +class LoggingMiddleware(TaskMiddleware): + def before(self, ctx): + print(f"Starting {ctx.task_name}[{ctx.id}]") + def after(self, ctx, result, error): + print(f"Finished {ctx.task_name}[{ctx.id}]") + +@queue.task(middleware=[TimingMiddleware(), LoggingMiddleware()]) +def process(data): + ... +``` + +### Execution order + +1. **Global middleware** (registered via `Queue(middleware=[...])`) runs first +2. **Per-task middleware** (via `@queue.task(middleware=[...])`) runs second +3. Within each group, middleware runs in **registration order** +4. `after()` hooks run in **reverse order** (like a stack) + +### Exception handling + +If a middleware hook raises an exception: + +- **`before()`**: the exception is logged, but subsequent middleware `before()` hooks still run. The task executes normally. +- **`after()`**: the exception is logged. Other `after()` hooks still run. +- **`on_retry()` / `on_dead_letter()`**: logged and swallowed — these are notification hooks, not control flow. + +Middleware exceptions never prevent task execution or result handling. + +## Middleware vs hooks + +taskito has two systems for running code around tasks: + +| | Hooks (`@queue.on_failure`, etc.) | Middleware (`TaskMiddleware`) | +|---|---|---| +| **Scope** | Queue-level only | Queue-level or per-task | +| **Interface** | Decorated functions | Class with up to 7 hooks | +| **Context** | Receives `task_name, args, kwargs` | Receives `JobContext` | +| **Enqueue hook** | No | Yes (`on_enqueue`, can mutate options) | +| **Retry hook** | No | Yes (`on_retry`) | +| **DLQ / timeout / cancel hooks** | No | Yes | +| **Execution order** | After middleware | Before hooks | + +Middleware runs **inside** the task wrapper (closer to the task function), +while hooks run **outside**. In practice, middleware `before()` fires +first, then `before_task` hooks. On completion, `on_success`/`on_failure` +hooks fire, then middleware `after()`. + +## Combining with OpenTelemetry + +The built-in `OpenTelemetryMiddleware` is itself a `TaskMiddleware`, so it +composes naturally with your own middleware: + +```python +from taskito import Queue +from taskito.contrib.otel import OpenTelemetryMiddleware + +queue = Queue(middleware=[ + OpenTelemetryMiddleware(), + LoggingMiddleware(), +]) +``` + +See the [OpenTelemetry guide](/docs/guides/integrations) for setup details. diff --git a/docs-next/content/docs/guides/extensibility/serializers.mdx b/docs-next/content/docs/guides/extensibility/serializers.mdx new file mode 100644 index 0000000..1cdfd8c --- /dev/null +++ b/docs-next/content/docs/guides/extensibility/serializers.mdx @@ -0,0 +1,155 @@ +--- +title: Pluggable Serializers +description: "CloudpickleSerializer, JsonSerializer, MsgPackSerializer, EncryptedSerializer, custom Serializer protocol." +--- + +import { Callout } from "fumadocs-ui/components/callout"; + +taskito uses a pluggable serializer for task arguments and results. By +default, it uses `CloudpickleSerializer`, but you can switch to +`JsonSerializer` or provide your own. + +## Built-in serializers + +### CloudpickleSerializer (default) + +Handles lambdas, closures, and complex Python objects. This is the default +— no configuration needed. + +```python +from taskito import Queue + +queue = Queue() # uses CloudpickleSerializer +``` + +### JsonSerializer + +Produces human-readable JSON payloads. Useful for debugging, cross-language +interop, or when you only pass simple types (strings, numbers, dicts, lists). + +```python +from taskito import Queue, JsonSerializer + +queue = Queue(serializer=JsonSerializer()) +``` + +### MsgPackSerializer + +MessagePack serialization: faster than cloudpickle, produces smaller +payloads, and is cross-language compatible. Requires the `msgpack` package. + +```bash +pip install msgpack +``` + +```python +from taskito.serializers import MsgPackSerializer + +queue = Queue(serializer=MsgPackSerializer()) +``` + + + `MsgPackSerializer` only handles basic types: dicts, lists, strings, + numbers, booleans, and `None`. It does not support lambdas, closures, or + arbitrary Python objects. Use `CloudpickleSerializer` when you need to + pass complex objects. + + +### EncryptedSerializer + +AES-256-GCM encryption for task arguments and results. Payloads stored in +the database are opaque ciphertext — only the key holder can read them. +Requires the `cryptography` package. + +```bash +pip install cryptography +``` + +```python +import os +from taskito.serializers import EncryptedSerializer + +queue = Queue(serializer=EncryptedSerializer(key=os.environ["QUEUE_KEY"])) +``` + +The key must be exactly 32 bytes, base64-encoded. Generate one with: + +```bash +python -c "import os, base64; print(base64.b64encode(os.urandom(32)).decode())" +``` + +By default, `EncryptedSerializer` wraps `CloudpickleSerializer`. To wrap a +different serializer: + +```python +from taskito.serializers import EncryptedSerializer, MsgPackSerializer + +queue = Queue(serializer=EncryptedSerializer(key=key, inner=MsgPackSerializer())) +``` + +## When to use each + +| | CloudpickleSerializer | JsonSerializer | MsgPackSerializer | EncryptedSerializer | +|---|---|---|---|---| +| **Complex objects** | Yes | No | No | Depends on inner serializer | +| **Debugging** | Binary payloads (opaque) | Human-readable JSON | Binary (opaque) | Ciphertext (opaque) | +| **Cross-language** | Python only | Any language | Any language | Python only (by default) | +| **Performance** | Good | Good for simple types | Best | Adds encryption overhead | +| **Security** | None | None | None | AES-256-GCM | +| **Extra dependency** | No | No | `msgpack` | `cryptography` | +| **Default** | Yes | No | No | No | + +**Rule of thumb**: use `CloudpickleSerializer` (default) unless you have a +specific reason to switch. Use `EncryptedSerializer` when tasks carry +sensitive data that must not be readable in the database. + +## Custom serializers + +Implement the `Serializer` protocol with two methods: + +```python +from taskito import Serializer + +class MsgpackSerializer: + def dumps(self, obj) -> bytes: + import msgpack + return msgpack.packb(obj) + + def loads(self, data: bytes): + import msgpack + return msgpack.unpackb(data, raw=False) + +queue = Queue(serializer=MsgpackSerializer()) +``` + +The protocol requires: + +| Method | Signature | Description | +|---|---|---| +| `dumps` | `(obj: Any) -> bytes` | Serialize a Python object to bytes | +| `loads` | `(data: bytes) -> Any` | Deserialize bytes back to a Python object | + +The serializer is used for both task arguments (`(args, kwargs)` tuple) and +return values. + + + `job.result()` uses the queue's configured serializer for + deserialization. If you're using `JsonSerializer` or a custom serializer, + results are correctly deserialized with that serializer — not hardcoded + cloudpickle. + + +## Configuration + +Pass the serializer to the `Queue` constructor: + +```python +queue = Queue( + db_path="myapp.db", + serializer=JsonSerializer(), +) +``` + +All tasks on the queue use the same serializer. The serializer must be +consistent between the enqueuing code and the worker — using different +serializers will cause deserialization failures.