Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
285 changes: 285 additions & 0 deletions docs-next/content/docs/guides/extensibility/events-webhooks.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
---
title: Events & Webhooks
description: "In-process event bus and HMAC-signed HTTP webhooks for job and worker lifecycle events."
---

taskito includes an in-process event bus and webhook delivery system for
reacting to job lifecycle events.

## Event types

The `EventType` enum defines all available lifecycle events:

| Event | Fired when | Payload fields |
|-------|------------|----------------|
| `JOB_ENQUEUED` | A job is added to the queue | `job_id`, `task_name`, `queue` |
| `JOB_COMPLETED` | A job finishes successfully | `job_id`, `task_name`, `queue` |
| `JOB_FAILED` | A job raises an exception (before retry) | `job_id`, `task_name`, `queue`, `error` |
| `JOB_RETRYING` | A failed job will be retried | `job_id`, `task_name`, `error`, `retry_count` |
| `JOB_DEAD` | A job exhausts all retries and enters the DLQ | `job_id`, `task_name`, `error` |
| `JOB_CANCELLED` | A job is cancelled | `job_id`, `task_name` |
| `WORKER_STARTED` | A worker process/thread comes online | `worker_id`, `hostname` |
| `WORKER_STOPPED` | A worker process/thread shuts down | `worker_id`, `hostname` |
| `WORKER_ONLINE` | Worker registered in storage (visible to fleet) | `worker_id`, `queues`, `pool` |
| `WORKER_OFFLINE` | Dead worker reaped (no heartbeat for 30s) | `worker_id` |
| `WORKER_UNHEALTHY` | Resource health transitions to unhealthy | `worker_id`, `resources` |
| `QUEUE_PAUSED` | A named queue is paused | `queue` |
| `QUEUE_RESUMED` | A paused queue is resumed | `queue` |

`JOB_RETRYING`, `JOB_DEAD`, and `JOB_CANCELLED` are emitted by the Rust
result handler immediately after the scheduler records the outcome.
Middleware hooks (`on_retry`, `on_dead_letter`, `on_cancel`) are called in
the same result-handling pass, after the event fires.

`QUEUE_PAUSED` and `QUEUE_RESUMED` are emitted synchronously by
`queue.pause()` and `queue.resume()` after the queue state is written to
storage.

## Registering listeners

Use `queue.on_event()` to subscribe a callback to a specific event type:

```python
from taskito import Queue
from taskito.events import EventType

queue = Queue(db_path="tasks.db")

def on_failure(event_type: EventType, payload: dict):
print(f"Job {payload['job_id']} failed: {payload.get('error')}")

queue.on_event(EventType.JOB_FAILED, on_failure)
```

### Callback signature

All callbacks receive two arguments:

- `event_type` (`EventType`) — the event that occurred
- `payload` (`dict`) — event details including `job_id`, `task_name`, `queue`, and event-specific fields

### Async delivery

Callbacks are dispatched asynchronously in a `ThreadPoolExecutor`. The
thread pool size defaults to 4 and can be configured via
`Queue(event_workers=N)`. This means:

- Callbacks never block the worker
- Exceptions in callbacks are logged but do not affect job processing
- Callbacks may execute slightly after the event occurs

## Webhooks

For external systems, register webhook URLs to receive HTTP POST requests
on job events.

### Registering a webhook

```python
queue.add_webhook(
url="https://example.com/hooks/taskito",
events=[EventType.JOB_FAILED, EventType.JOB_DEAD],
headers={"Authorization": "Bearer mytoken"},
secret="my-signing-secret",
)
```

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `url` | `str` | — | URL to POST event payloads to (must be `http://` or `https://`) |
| `events` | `list[EventType] \| None` | `None` | Event types to subscribe to. `None` means all events |
| `headers` | `dict[str, str] \| None` | `None` | Extra HTTP headers to include in requests |
| `secret` | `str \| None` | `None` | HMAC-SHA256 signing secret |
| `max_retries` | `int` | `3` | Maximum delivery attempts |
| `timeout` | `float` | `10.0` | HTTP request timeout in seconds |
| `retry_backoff` | `float` | `2.0` | Base for exponential backoff between retries |

### HMAC signing

When a `secret` is provided, each webhook request includes an
`X-Taskito-Signature` header:

```
X-Taskito-Signature: sha256=<hex digest>
```

The signature is computed over the JSON request body using HMAC-SHA256.
Verify it on the receiving end:

```python
import hashlib
import hmac

def verify_signature(body: bytes, signature: str, secret: str) -> bool:
expected = hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()
return hmac.compare_digest(f"sha256={expected}", signature)
```

### Retry behavior

Failed webhook deliveries are retried with exponential backoff. The number
of attempts, request timeout, and backoff base are configurable per webhook
via `max_retries`, `timeout`, and `retry_backoff`. With the defaults
(`max_retries=3`, `retry_backoff=2.0`):

| Attempt | Delay before next retry |
|---------|------------------------|
| 1st retry | 1 second (`2.0 ** 0`) |
| 2nd retry | 2 seconds (`2.0 ** 1`) |
| 3rd retry | — (final) |

4xx responses are not retried. If all attempts fail, a warning is logged
and the event is dropped.

### Event filtering

Subscribe to specific events or all events:

```python
# Only failure events
queue.add_webhook(
url="https://slack.example.com/webhook",
events=[EventType.JOB_FAILED, EventType.JOB_DEAD],
)

# All events
queue.add_webhook(url="https://monitoring.example.com/events")
```

## Examples

### Slack notification on job failure

```python
import requests
from taskito.events import EventType

def notify_slack(event_type: EventType, payload: dict):
requests.post(
"https://hooks.slack.com/services/T.../B.../xxx",
json={
"text": f":x: Task `{payload['task_name']}` failed\n"
f"Job ID: `{payload['job_id']}`\n"
f"Error: {payload.get('error', 'unknown')}"
},
)

queue.on_event(EventType.JOB_FAILED, notify_slack)
queue.on_event(EventType.JOB_DEAD, notify_slack)
```

### Webhook to external monitoring

```python
queue.add_webhook(
url="https://monitoring.example.com/api/taskito-events",
events=[EventType.JOB_COMPLETED, EventType.JOB_FAILED, EventType.JOB_DEAD],
secret="whsec_abc123",
headers={"X-Source": "taskito-prod"},
)
```

The monitoring service receives JSON payloads like:

```json
{
"event": "job.failed",
"job_id": "01H5K6X...",
"task_name": "myapp.tasks.process",
"queue": "default",
"error": "ConnectionError: ..."
}
```

### Job completion tracking

```python
from taskito.events import EventType

completed_count = 0

def track_completion(event_type: EventType, payload: dict):
global completed_count
completed_count += 1
if completed_count % 100 == 0:
print(f"Milestone: {completed_count} jobs completed")

queue.on_event(EventType.JOB_COMPLETED, track_completion)
```

### Database logging for audit trail

```python
from taskito.events import EventType

def audit_log(event_type: EventType, payload: dict):
db.execute(
"INSERT INTO audit_log (event, job_id, task_name, timestamp) VALUES (?, ?, ?, ?)",
(event_type.value, payload["job_id"], payload["task_name"], time.time()),
)

# Subscribe to all important events
for event in [EventType.JOB_ENQUEUED, EventType.JOB_COMPLETED, EventType.JOB_FAILED, EventType.JOB_DEAD]:
queue.on_event(event, audit_log)
```

## Event ordering

Events fire in the order the scheduler processes results — typically the
order jobs complete. For jobs that complete nearly simultaneously, ordering
is **not guaranteed** across different workers or threads.

Within a single job's lifecycle, events always fire in this order:

1. `JOB_ENQUEUED` (at enqueue time)
2. `JOB_COMPLETED` / `JOB_FAILED` / `JOB_CANCELLED` (at completion)
3. `JOB_RETRYING` (if retried, before the next attempt)
4. `JOB_DEAD` (if all retries exhausted)

## Backpressure

Events are dispatched to a thread pool (default size: 4, configurable via
`event_workers=N`). If callbacks are slow and events arrive faster than
they can be processed, they queue in memory.

For high-volume event scenarios:

```python
queue = Queue(event_workers=16) # More threads for slow callbacks
```

If a callback raises an exception, it is logged and the event is dropped —
it does not retry or block other callbacks.

## Webhook failure

Webhooks retry with exponential backoff (up to `max_retries`). After all
retries are exhausted, the webhook delivery is **logged and dropped** —
there is no dead-letter queue for webhooks. Monitor webhook failures via
the `on_failure` callback or structured logging.

### Webhook receiver (Flask)

A minimal Flask app that receives and verifies taskito webhooks:

```python
from flask import Flask, request, abort
import hashlib, hmac

app = Flask(__name__)
WEBHOOK_SECRET = "my-signing-secret"

@app.route("/hooks/taskito", methods=["POST"])
def receive_webhook():
signature = request.headers.get("X-Taskito-Signature", "")
expected = hmac.new(
WEBHOOK_SECRET.encode(), request.data, hashlib.sha256
).hexdigest()

if not hmac.compare_digest(f"sha256={expected}", signature):
abort(401)

event = request.json
print(f"Received event: {event['event']} for job {event['job_id']}")
return "", 204
```
12 changes: 7 additions & 5 deletions docs-next/content/docs/guides/extensibility/index.mdx
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
---
title: Extensibility
description: "Middleware, custom serializers, interception."
description: "Extend taskito at every stage of the task lifecycle."
---

import { Callout } from 'fumadocs-ui/components/callout';
Extend taskito with custom behavior at every stage of the task lifecycle.

<Callout title="Phase 1 stub" type="info">
Content port pending. See the [Zensical source](https://github.com/ByteVeda/taskito/tree/master/docs) for current text.
</Callout>
| Guide | Description |
|---|---|
| [Middleware](/docs/guides/extensibility/middleware) | Hook into task execution with `before`, `after`, `on_retry`, and more |
| [Serializers](/docs/guides/extensibility/serializers) | Custom payload serialization — msgpack, orjson, encryption |
| [Events & Webhooks](/docs/guides/extensibility/events-webhooks) | React to queue events and push notifications to external services |
2 changes: 1 addition & 1 deletion docs-next/content/docs/guides/extensibility/meta.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"title": "Extensibility",
"pages": ["index"]
"pages": ["index", "middleware", "serializers", "events-webhooks"]
}
Loading
Loading