From 0bf8ab89cb4f2d921cf2761f5762c08773dfff06 Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Sun, 3 May 2026 02:44:56 +0530 Subject: [PATCH] docs(docs-next): port guides/integrations (phase 4i) --- .../docs/guides/integrations/django.mdx | 159 ++++++++++++++ .../docs/guides/integrations/fastapi.mdx | 207 ++++++++++++++++++ .../docs/guides/integrations/flask.mdx | 179 +++++++++++++++ .../docs/guides/integrations/index.mdx | 55 ++++- .../docs/guides/integrations/meta.json | 10 +- .../content/docs/guides/integrations/otel.mdx | 110 ++++++++++ .../docs/guides/integrations/prometheus.mdx | 198 +++++++++++++++++ .../docs/guides/integrations/sentry.mdx | 136 ++++++++++++ 8 files changed, 1048 insertions(+), 6 deletions(-) create mode 100644 docs-next/content/docs/guides/integrations/django.mdx create mode 100644 docs-next/content/docs/guides/integrations/fastapi.mdx create mode 100644 docs-next/content/docs/guides/integrations/flask.mdx create mode 100644 docs-next/content/docs/guides/integrations/otel.mdx create mode 100644 docs-next/content/docs/guides/integrations/prometheus.mdx create mode 100644 docs-next/content/docs/guides/integrations/sentry.mdx diff --git a/docs-next/content/docs/guides/integrations/django.mdx b/docs-next/content/docs/guides/integrations/django.mdx new file mode 100644 index 0000000..cdb1a34 --- /dev/null +++ b/docs-next/content/docs/guides/integrations/django.mdx @@ -0,0 +1,159 @@ +--- +title: Django Integration +description: "Admin views, settings, autodiscovery, management commands — no Django ORM models required." +--- + +import { Callout } from "fumadocs-ui/components/callout"; + +taskito provides Django admin views for browsing jobs, inspecting dead +letters, and viewing queue statistics — all without Django ORM models. + +## Installation + +```bash +pip install taskito[django] +``` + +## Setup + +### Option 1: register on the default admin site + +In your project's `admin.py` or `urls.py`: + +```python +from taskito.contrib.django.admin import register_taskito_admin + +register_taskito_admin() +``` + +This adds taskito views to the default `admin.site`. + +### Option 2: custom admin site + +Use `TaskitoAdminSite` for a dedicated admin: + +```python +from taskito.contrib.django.admin import TaskitoAdminSite + +admin_site = TaskitoAdminSite(name="taskito_admin") +``` + +## Django settings + +The following settings can be defined in your Django `settings.py`: + +| Setting | Default | Description | +|---------|---------|-------------| +| `TASKITO_AUTODISCOVER_MODULE` | `"tasks"` | Module name auto-discovered in each installed app on startup. | +| `TASKITO_ADMIN_PER_PAGE` | `50` | Rows per page in the admin jobs and dead letters views. | +| `TASKITO_ADMIN_TITLE` | `"Taskito"` | Browser tab title for `TaskitoAdminSite`. | +| `TASKITO_ADMIN_HEADER` | `"Taskito Admin"` | Site header shown in `TaskitoAdminSite`. | +| `TASKITO_WATCH_INTERVAL` | `2` | Polling interval in seconds for `manage.py taskito_info --watch`. | +| `TASKITO_DASHBOARD_HOST` | `"127.0.0.1"` | Default bind host for `manage.py taskito_dashboard`. | +| `TASKITO_DASHBOARD_PORT` | `8080` | Default bind port for `manage.py taskito_dashboard`. | + +Example: + +```python +# settings.py +TASKITO_AUTODISCOVER_MODULE = "jobs" # import myapp.jobs instead of myapp.tasks +TASKITO_ADMIN_PER_PAGE = 25 +TASKITO_ADMIN_TITLE = "MyApp Tasks" +TASKITO_ADMIN_HEADER = "MyApp Task Queue" +TASKITO_DASHBOARD_HOST = "0.0.0.0" +TASKITO_DASHBOARD_PORT = 9000 +``` + +## Queue configuration + +Create a `taskito` queue instance in your Django project. The +`get_queue()` function in `taskito.contrib.django.settings` is used to +retrieve the queue instance. + +```python +# myproject/tasks.py +from taskito import Queue + +queue = Queue(db_path="taskito.db") + +@queue.task() +def send_welcome_email(user_id: int): + from myapp.models import User + user = User.objects.get(id=user_id) + user.email_user("Welcome!", "Thanks for signing up.") +``` + + + Import Django models inside the task function body to avoid app registry + issues during startup. + + +## Admin views + +The integration provides the following views under `/admin/taskito/`: + +- **Dashboard** — Queue statistics overview (pending, running, completed, failed, dead, cancelled) +- **Jobs** — Paginated job list with status, queue, and task name filters +- **Job Detail** — Full job payload, error history, retry count, and metadata +- **Dead Letters** — Browse and retry dead letter entries + +## Running the worker + +```bash +DJANGO_SETTINGS_MODULE=myproject.settings taskito worker --app myproject.tasks:queue +``` + +## Full example + +### Project structure + +``` +myproject/ + settings.py + urls.py + tasks.py # Queue + task definitions + myapp/ + admin.py # Register taskito admin views + views.py + models.py +``` + +### `tasks.py` + +```python +from taskito import Queue + +queue = Queue(db_path="taskito.db") + +@queue.task(max_retries=3) +def send_welcome_email(user_id: int): + from myapp.models import User + user = User.objects.get(id=user_id) + user.email_user("Welcome!", "Thanks for signing up.") + +@queue.task(rate_limit="60/h") +def generate_monthly_report(month: int, year: int): + from myapp.reports import build_report + return build_report(month, year) +``` + +### `myapp/admin.py` + +```python +from django.contrib import admin +from taskito.contrib.django.admin import register_taskito_admin + +register_taskito_admin() +``` + +### `myapp/views.py` + +```python +from django.http import JsonResponse +from myproject.tasks import send_welcome_email + +def signup_view(request): + user = create_user(request.POST) + send_welcome_email.delay(user.id) + return JsonResponse({"status": "ok"}) +``` diff --git a/docs-next/content/docs/guides/integrations/fastapi.mdx b/docs-next/content/docs/guides/integrations/fastapi.mdx new file mode 100644 index 0000000..bc72936 --- /dev/null +++ b/docs-next/content/docs/guides/integrations/fastapi.mdx @@ -0,0 +1,207 @@ +--- +title: FastAPI Integration +description: "TaskitoRouter — pre-built APIRouter with job status, SSE progress, dead letter management." +--- + +taskito provides a pre-built `APIRouter` for FastAPI with endpoints for +job management, progress streaming via SSE, and dead letter queue +operations. + +## Installation + +```bash +pip install taskito[fastapi] +``` + +This installs `fastapi` and `pydantic` as extras. + +## Quick setup + +```python +from fastapi import FastAPI +from taskito import Queue +from taskito.contrib.fastapi import TaskitoRouter + +queue = Queue(db_path="myapp.db") + +@queue.task() +def process_data(payload: dict) -> str: + return "done" + +app = FastAPI() +app.include_router(TaskitoRouter(queue), prefix="/tasks") +``` + +Run with: + +```bash +uvicorn myapp:app --reload +``` + +## Endpoints + +| Method | Path | Description | +|--------|------|-------------| +| `GET` | `/stats` | Queue statistics | +| `GET` | `/stats/queues` | Per-queue statistics | +| `GET` | `/jobs/{job_id}` | Job status, progress, and metadata | +| `GET` | `/jobs/{job_id}/errors` | Error history for a job | +| `GET` | `/jobs/{job_id}/result` | Job result (optional `?timeout=N` for blocking) | +| `GET` | `/jobs/{job_id}/progress` | SSE stream of progress updates | +| `POST` | `/jobs/{job_id}/cancel` | Cancel a pending job | +| `GET` | `/dead-letters` | List dead letter entries (paginated) | +| `POST` | `/dead-letters/{dead_id}/retry` | Re-enqueue a dead letter | +| `GET` | `/health` | Liveness check | +| `GET` | `/readiness` | Readiness check | +| `GET` | `/resources` | Worker resource status | + +## Configuration + +`TaskitoRouter` accepts options to control which routes are registered, +how results are serialized, and page sizes: + +```python +from fastapi import Depends, HTTPException +from taskito.contrib.fastapi import TaskitoRouter + +def require_api_key(x_api_key: str = Header(...)): + if x_api_key != "secret": + raise HTTPException(status_code=403) + +app.include_router( + TaskitoRouter( + queue, + include_routes={"stats", "jobs", "dead-letters", "retry-dead"}, + dependencies=[Depends(require_api_key)], + sse_poll_interval=1.0, + result_timeout=5.0, + default_page_size=25, + max_page_size=200, + result_serializer=lambda v: v if isinstance(v, (str, int, float, bool, None)) else str(v), + ), + prefix="/tasks", +) +``` + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `include_routes` | `set[str] \| None` | `None` | If set, only register these route names. Cannot be combined with `exclude_routes`. | +| `exclude_routes` | `set[str] \| None` | `None` | If set, skip these route names. Cannot be combined with `include_routes`. | +| `dependencies` | `Sequence[Depends] \| None` | `None` | FastAPI dependencies applied to every route (e.g. auth). | +| `sse_poll_interval` | `float` | `0.5` | Seconds between SSE progress polls. | +| `result_timeout` | `float` | `1.0` | Default timeout for non-blocking result fetch. | +| `default_page_size` | `int` | `20` | Default page size for paginated endpoints. | +| `max_page_size` | `int` | `100` | Maximum allowed page size. | +| `result_serializer` | `Callable[[Any], Any] \| None` | `None` | Custom result serializer. Receives any value, must return a JSON-serializable value. | + +Valid route names: `stats`, `jobs`, `job-errors`, `job-result`, +`job-progress`, `cancel`, `dead-letters`, `retry-dead`, `health`, +`readiness`, `resources`, `queue-stats`. + +## Blocking result fetch + +The `/jobs/{job_id}/result` endpoint supports an optional `timeout` query +parameter (0–300 seconds). When `timeout > 0`, the request blocks until +the job completes or the timeout elapses: + +```bash +# Non-blocking (default) +curl http://localhost:8000/tasks/jobs/01H5K6X.../result + +# Block up to 30 seconds for the result +curl http://localhost:8000/tasks/jobs/01H5K6X.../result?timeout=30 +``` + +## SSE progress streaming + +Stream real-time progress for a running job using Server-Sent Events: + +```python +import httpx + +with httpx.stream("GET", "http://localhost:8000/tasks/jobs/01H5K6X.../progress") as r: + for line in r.iter_lines(): + print(line) + # data: {"progress": 25, "status": "running"} + # data: {"progress": 50, "status": "running"} + # data: {"progress": 100, "status": "completed"} +``` + +From the browser: + +```javascript +const source = new EventSource("/tasks/jobs/01H5K6X.../progress"); +source.onmessage = (event) => { + const data = JSON.parse(event.data); + console.log(`Progress: ${data.progress}%`); + if (data.status === "completed" || data.status === "failed") { + source.close(); + } +}; +``` + +The stream sends a JSON event every 0.5 seconds while the job is active, +then a final event when the job reaches a terminal state. + +## Pydantic response models + +All endpoints return validated Pydantic models with clean OpenAPI docs. +You can import them for type-safe client code: + +```python +from taskito.contrib.fastapi import ( + StatsResponse, + JobResponse, + JobErrorResponse, + JobResultResponse, + CancelResponse, + DeadLetterResponse, + RetryResponse, +) +``` + +## Full example + +```python +from fastapi import FastAPI, Header, HTTPException, Depends +from taskito import Queue, current_job +from taskito.contrib.fastapi import TaskitoRouter + +queue = Queue(db_path="myapp.db") + +@queue.task() +def resize_image(image_url: str, sizes: list[int]) -> dict: + results = {} + for i, size in enumerate(sizes): + results[size] = do_resize(image_url, size) + current_job.update_progress(int((i + 1) / len(sizes) * 100)) + return results + +async def require_auth(authorization: str = Header(...)): + if not authorization.startswith("Bearer "): + raise HTTPException(401) + +app = FastAPI(title="Image Service") +app.include_router( + TaskitoRouter(queue, dependencies=[Depends(require_auth)]), + prefix="/tasks", + tags=["tasks"], +) + +# Start worker in a separate process: +# taskito worker --app myapp:queue +``` + +```bash +# Check job status +curl http://localhost:8000/tasks/jobs/01H5K6X... \ + -H "Authorization: Bearer mytoken" + +# Stream progress +curl -N http://localhost:8000/tasks/jobs/01H5K6X.../progress \ + -H "Authorization: Bearer mytoken" + +# Block for result (up to 60s) +curl http://localhost:8000/tasks/jobs/01H5K6X.../result?timeout=60 \ + -H "Authorization: Bearer mytoken" +``` diff --git a/docs-next/content/docs/guides/integrations/flask.mdx b/docs-next/content/docs/guides/integrations/flask.mdx new file mode 100644 index 0000000..1f6afe1 --- /dev/null +++ b/docs-next/content/docs/guides/integrations/flask.mdx @@ -0,0 +1,179 @@ +--- +title: Flask Integration +description: "Taskito(app) extension with config-driven Queue setup and CLI commands." +--- + +taskito provides a first-class Flask extension that configures a `Queue` +from your app config and registers CLI commands. + +## Installation + +```bash +pip install taskito[flask] +``` + +## Basic setup + +```python +from flask import Flask +from taskito.contrib.flask import Taskito + +app = Flask(__name__) +app.config["TASKITO_DB_PATH"] = "myapp.db" + +taskito = Taskito(app) + +@taskito.queue.task() +def send_email(to: str, subject: str): + ... +``` + +## Factory pattern + +```python +from taskito.contrib.flask import Taskito + +taskito = Taskito() + +def create_app(): + app = Flask(__name__) + app.config["TASKITO_DB_PATH"] = "myapp.db" + taskito.init_app(app) + return app +``` + +## Configuration + +All configuration is read from `app.config`: + +| Config Key | Default | Description | +|------------|---------|-------------| +| `TASKITO_DB_PATH` | `.taskito/taskito.db` | SQLite database path | +| `TASKITO_BACKEND` | `"sqlite"` | Storage backend: `"sqlite"` or `"postgres"` | +| `TASKITO_DB_URL` | `None` | PostgreSQL connection URL (when backend is `"postgres"`) | +| `TASKITO_WORKERS` | `0` (auto) | Number of worker threads | +| `TASKITO_SCHEMA` | `"taskito"` | PostgreSQL schema name | +| `TASKITO_DEFAULT_RETRY` | `3` | Default retry count for tasks | +| `TASKITO_DEFAULT_TIMEOUT` | `300` | Default timeout in seconds | +| `TASKITO_DEFAULT_PRIORITY` | `0` | Default task priority | +| `TASKITO_RESULT_TTL` | `None` | Auto-purge completed jobs after N seconds | +| `TASKITO_DRAIN_TIMEOUT` | `30` | Seconds to wait for running tasks on shutdown | + +## Extension options + +The `Taskito` constructor accepts a `cli_group` parameter to rename the CLI +command group: + +```python +# Commands will be under `flask tasks worker`, `flask tasks info`, etc. +taskito = Taskito(app, cli_group="tasks") +``` + +## CLI commands + +The extension registers commands under the `flask taskito` group +(configurable via `cli_group`): + +### `flask taskito worker` + +Start a taskito worker: + +```bash +flask taskito worker +flask taskito worker --queues default,emails +``` + +### `flask taskito info` + +Show queue statistics. Supports `--format table` (default) and +`--format json`: + +```bash +flask taskito info +flask taskito info --format json +``` + +``` +taskito queue statistics +------------------------------ + pending 12 + running 3 + completed 450 + failed 2 + dead 1 + cancelled 0 +------------------------------ + total 468 +``` + +## Accessing the queue + +```python +# Via the extension instance +taskito.queue.stats() + +# Via app extensions +app.extensions["taskito"].queue.stats() +``` + +## Full example + +A complete Flask application with task definitions and routes: + +```python +from flask import Flask, jsonify, request +from taskito.contrib.flask import Taskito + +app = Flask(__name__) +app.config["TASKITO_DB_PATH"] = "myapp.db" +app.config["TASKITO_DEFAULT_RETRY"] = 3 +app.config["TASKITO_RESULT_TTL"] = 86400 # 24h auto-cleanup + +taskito = Taskito(app) + +@taskito.queue.task() +def send_welcome_email(user_id: int): + """Send a welcome email to a new user.""" + user = get_user(user_id) + send_email(user.email, "Welcome!", "Thanks for signing up.") + +@taskito.queue.task(rate_limit="10/m") +def generate_report(report_type: str, params: dict): + """Generate a report (rate-limited to 10/minute).""" + return create_report(report_type, params) + +@app.route("/api/users", methods=["POST"]) +def create_user(): + user = create_user_in_db(request.json) + send_welcome_email.delay(user.id) + return jsonify({"id": user.id}), 201 + +@app.route("/api/reports", methods=["POST"]) +def request_report(): + job = generate_report.delay( + request.json["type"], + request.json.get("params", {}), + ) + return jsonify({"job_id": job.id}), 202 + +@app.route("/api/reports/") +def report_status(job_id: str): + job = taskito.queue.get_job(job_id) + if job is None: + return jsonify({"error": "Not found"}), 404 + return jsonify({"status": job.status, "progress": job.progress}) + +@app.route("/api/queue/stats") +def queue_stats(): + return jsonify(taskito.queue.stats()) +``` + +Run the app and worker: + +```bash +# Terminal 1: Flask app +flask run + +# Terminal 2: Worker +flask taskito worker +``` diff --git a/docs-next/content/docs/guides/integrations/index.mdx b/docs-next/content/docs/guides/integrations/index.mdx index ad48740..ef42bee 100644 --- a/docs-next/content/docs/guides/integrations/index.mdx +++ b/docs-next/content/docs/guides/integrations/index.mdx @@ -1,10 +1,55 @@ --- title: Integrations -description: "Flask, Django, FastAPI, OTel, Sentry, Prometheus." +description: "Flask, FastAPI, Django, OpenTelemetry, Prometheus, Sentry, encryption, msgpack, Postgres, Redis." --- -import { Callout } from 'fumadocs-ui/components/callout'; +taskito offers optional extras for popular frameworks and observability +tools. Install only what you need. - - Content port pending. See the [Zensical source](https://github.com/ByteVeda/taskito/tree/master/docs) for current text. - +## Available integrations + +| Extra | Install | What you get | +|-------|---------|--------------| +| **Flask** | `pip install taskito[flask]` | `Taskito(app)` extension, `flask taskito worker` CLI | +| **FastAPI** | `pip install taskito[fastapi]` | `TaskitoRouter` for instant REST API over the queue | +| **Django** | `pip install taskito[django]` | Admin integration, management commands | +| **OpenTelemetry** | `pip install taskito[otel]` | Distributed tracing with span-per-task | +| **Prometheus** | `pip install taskito[prometheus]` | `PrometheusMiddleware`, queue depth gauges, `/metrics` server | +| **Sentry** | `pip install taskito[sentry]` | `SentryMiddleware` with auto error capture and task tags | +| **Encryption** | `pip install taskito[encryption]` | `EncryptedSerializer` for at-rest payload encryption | +| **MsgPack** | `pip install taskito[msgpack]` | `MsgpackSerializer` for compact binary serialization | +| **Postgres** | `pip install taskito[postgres]` | Multi-machine workers via PostgreSQL backend | +| **Redis** | `pip install taskito[redis]` | Redis storage backend | + +## Framework integrations + +- **[Flask](/docs/guides/integrations/flask)** — full Flask extension with app config, factory pattern, and CLI commands +- **[FastAPI](/docs/guides/integrations/fastapi)** — pre-built `APIRouter` with job status, SSE progress, and DLQ management +- **[Django](/docs/guides/integrations/django)** — admin views for browsing jobs, dead letters, and queue stats + +## Observability integrations + +- **[OpenTelemetry](/docs/guides/integrations/otel)** — distributed tracing with per-task spans +- **[Prometheus](/docs/guides/integrations/prometheus)** — counters, histograms, and gauges for task execution +- **[Sentry](/docs/guides/integrations/sentry)** — automatic error capture with task context + +## Combining integrations + +All middleware-based integrations (`OpenTelemetryMiddleware`, +`PrometheusMiddleware`, `SentryMiddleware`) compose together: + +```python +from taskito import Queue +from taskito.contrib.otel import OpenTelemetryMiddleware +from taskito.contrib.prometheus import PrometheusMiddleware +from taskito.contrib.sentry import SentryMiddleware + +queue = Queue( + db_path="myapp.db", + middleware=[ + OpenTelemetryMiddleware(), + PrometheusMiddleware(), + SentryMiddleware(), + ], +) +``` diff --git a/docs-next/content/docs/guides/integrations/meta.json b/docs-next/content/docs/guides/integrations/meta.json index b277815..21f9337 100644 --- a/docs-next/content/docs/guides/integrations/meta.json +++ b/docs-next/content/docs/guides/integrations/meta.json @@ -1,4 +1,12 @@ { "title": "Integrations", - "pages": ["index"] + "pages": [ + "index", + "flask", + "fastapi", + "django", + "otel", + "prometheus", + "sentry" + ] } diff --git a/docs-next/content/docs/guides/integrations/otel.mdx b/docs-next/content/docs/guides/integrations/otel.mdx new file mode 100644 index 0000000..1c42c00 --- /dev/null +++ b/docs-next/content/docs/guides/integrations/otel.mdx @@ -0,0 +1,110 @@ +--- +title: OpenTelemetry Integration +description: "Distributed tracing with span-per-task — exporters, customization, composition." +--- + +import { Callout } from "fumadocs-ui/components/callout"; + +taskito provides optional OpenTelemetry support for distributed tracing of +task execution. + +## Installation + +Install with the `otel` extra: + +```bash +pip install taskito[otel] +``` + +This installs `opentelemetry-api` as a dependency. + +## Setup + +Add `OpenTelemetryMiddleware` to your queue: + +```python +from taskito import Queue +from taskito.contrib.otel import OpenTelemetryMiddleware + +queue = Queue(middleware=[OpenTelemetryMiddleware()]) +``` + +## What gets traced + +Each task execution produces a span with: + +- **Span name**: `taskito.execute.` (customizable) +- **Attributes**: + - `taskito.job_id` — the job ID + - `taskito.task_name` — the registered task name + - `taskito.queue` — the queue name + - `taskito.retry_count` — current retry attempt +- **Status**: `OK` on success, `ERROR` on failure (with exception recorded) +- **Events**: a `retry` event is added when a task is about to be retried + +## Configuration with exporters + +`OpenTelemetryMiddleware` uses the standard OpenTelemetry API, so configure +exporters as you normally would: + +```python +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter + +# Set up the tracer provider with an OTLP exporter +provider = TracerProvider() +provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter())) +trace.set_tracer_provider(provider) + +# Now create your queue — spans will be exported automatically +from taskito import Queue +from taskito.contrib.otel import OpenTelemetryMiddleware + +queue = Queue(middleware=[OpenTelemetryMiddleware()]) +``` + +## Configuration + +`OpenTelemetryMiddleware` accepts several options to customize how spans +are created: + +```python +OpenTelemetryMiddleware( + tracer_name="my-service", + span_name_fn=lambda ctx: f"task/{ctx.task_name}", + attribute_prefix="myapp", + extra_attributes_fn=lambda ctx: {"deployment.env": "prod"}, + task_filter=lambda name: not name.startswith("internal."), +) +``` + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `tracer_name` | `str` | `"taskito"` | OpenTelemetry tracer name. | +| `span_name_fn` | `Callable[[JobContext], str] \| None` | `None` | Custom span name builder. Receives `JobContext`, returns a string. Defaults to `.execute.`. | +| `attribute_prefix` | `str` | `"taskito"` | Prefix for all span attribute keys. | +| `extra_attributes_fn` | `Callable[[JobContext], dict] \| None` | `None` | Returns extra attributes to add to each span. Receives `JobContext`. | +| `task_filter` | `Callable[[str], bool] \| None` | `None` | Predicate that receives a task name. Return `True` to trace, `False` to skip. `None` traces all tasks. | + +## Combining with other middleware + +`OpenTelemetryMiddleware` is a standard `TaskMiddleware`, so it composes +with other middleware: + +```python +queue = Queue(middleware=[ + OpenTelemetryMiddleware(), + MyLoggingMiddleware(), +]) +``` + + + `OpenTelemetryMiddleware` is thread-safe and can be used with + multi-worker configurations. Internal span tracking is protected by a + lock. + + +See the [Middleware guide](/docs/guides/extensibility/middleware) for more +on combining middleware. diff --git a/docs-next/content/docs/guides/integrations/prometheus.mdx b/docs-next/content/docs/guides/integrations/prometheus.mdx new file mode 100644 index 0000000..ca9e389 --- /dev/null +++ b/docs-next/content/docs/guides/integrations/prometheus.mdx @@ -0,0 +1,198 @@ +--- +title: Prometheus Metrics +description: "PrometheusMiddleware + PrometheusStatsCollector — counters, histograms, gauges, /metrics server." +--- + +taskito provides Prometheus metrics via a middleware and an optional stats +collector thread. + +## Installation + +```bash +pip install taskito[prometheus] +``` + +This installs `prometheus-client` as a dependency. + +## PrometheusMiddleware + +Add `PrometheusMiddleware` to your queue to track per-task execution +metrics: + +```python +from taskito import Queue +from taskito.contrib.prometheus import PrometheusMiddleware + +queue = Queue(db_path="myapp.db", middleware=[PrometheusMiddleware()]) +``` + +### Configuration + +```python +PrometheusMiddleware( + namespace="myapp", + extra_labels_fn=lambda ctx: {"env": "prod", "region": "us-east-1"}, + disabled_metrics={"resource", "proxy"}, + task_filter=lambda name: not name.startswith("internal."), +) +``` + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `namespace` | `str` | `"taskito"` | Prefix for all metric names. | +| `extra_labels_fn` | `Callable[[JobContext], dict[str, str]] \| None` | `None` | Returns extra labels to add to job metrics. Receives `JobContext`. | +| `disabled_metrics` | `set[str] \| None` | `None` | Metric groups or individual names to skip. Groups: `"jobs"`, `"queue"`, `"resource"`, `"proxy"`, `"intercept"`. | +| `task_filter` | `Callable[[str], bool] \| None` | `None` | Predicate that receives a task name. Return `True` to export metrics for the task, `False` to skip it. `None` exports all tasks. | + +### Metrics tracked + +| Metric | Type | Labels | Description | +|--------|------|--------|-------------| +| `taskito_jobs_total` | Counter | `task`, `status` | Total jobs processed (status: `completed` or `failed`) | +| `taskito_job_duration_seconds` | Histogram | `task` | Job execution duration | +| `taskito_active_workers` | Gauge | — | Number of currently executing workers | +| `taskito_retries_total` | Counter | `task` | Total retry attempts | + +## PrometheusStatsCollector + +For queue-level metrics, use the stats collector. It polls `queue.stats()` +on a background thread: + +```python +from taskito.contrib.prometheus import PrometheusStatsCollector + +collector = PrometheusStatsCollector(queue, interval=10) +collector.start() +``` + +### Configuration + +```python +PrometheusStatsCollector( + queue, + interval=10, + namespace="myapp", + disabled_metrics={"intercept"}, +) +``` + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `queue` | `Queue` | — | The Queue instance to poll. | +| `interval` | `float` | `10.0` | Seconds between polls. | +| `namespace` | `str` | `"taskito"` | Prefix for metric names. Must match `PrometheusMiddleware` namespace to share metric objects. | +| `disabled_metrics` | `set[str] \| None` | `None` | Metric groups or names to skip. Same groups as `PrometheusMiddleware`. | + +### Metrics tracked + +| Metric | Type | Labels | Description | +|--------|------|--------|-------------| +| `taskito_queue_depth` | Gauge | `queue` | Number of pending jobs | +| `taskito_dlq_size` | Gauge | — | Number of dead-letter jobs | +| `taskito_worker_utilization` | Gauge | — | Ratio of running jobs to total workers (0.0–1.0) | + +## Metrics server + +Start a standalone `/metrics` endpoint for Prometheus to scrape: + +```python +from taskito.contrib.prometheus import start_metrics_server + +start_metrics_server(port=9090) +``` + +This uses `prometheus_client.start_http_server` under the hood. + +## Full example + +```python +from taskito import Queue +from taskito.contrib.prometheus import ( + PrometheusMiddleware, + PrometheusStatsCollector, + start_metrics_server, +) + +queue = Queue(db_path="myapp.db", middleware=[PrometheusMiddleware()]) + +# Start metrics endpoint +start_metrics_server(port=9090) + +# Start queue stats polling +collector = PrometheusStatsCollector(queue, interval=10) +collector.start() +``` + +Prometheus scrape config: + +```yaml +scrape_configs: + - job_name: taskito + static_configs: + - targets: ["localhost:9090"] +``` + +## Grafana dashboard tips + +Useful panels for a taskito Grafana dashboard: + +- **Throughput** — `rate(taskito_jobs_total[5m])` by `task` and `status` +- **Duration p95** — `histogram_quantile(0.95, rate(taskito_job_duration_seconds_bucket[5m]))` +- **Queue depth** — `taskito_queue_depth` by `queue` +- **DLQ size** — `taskito_dlq_size` with alert threshold +- **Worker utilization** — `taskito_worker_utilization` as a gauge + +## Combining with other middleware + +`PrometheusMiddleware` composes with other middleware: + +```python +from taskito.contrib.otel import OpenTelemetryMiddleware +from taskito.contrib.sentry import SentryMiddleware + +queue = Queue( + db_path="myapp.db", + middleware=[ + OpenTelemetryMiddleware(), + PrometheusMiddleware(), + SentryMiddleware(), + ], +) +``` + +See the [Middleware guide](/docs/guides/extensibility/middleware) for more +on combining middleware. + +## Example: alert on high DLQ size + +```python +from taskito.contrib.prometheus import PrometheusMiddleware, PrometheusStatsCollector, start_metrics_server + +queue = Queue(db_path="myapp.db", middleware=[PrometheusMiddleware()]) + +# Start metrics and collector +start_metrics_server(port=9090) +PrometheusStatsCollector(queue, interval=10).start() +``` + +Prometheus alerting rule: + +```yaml +groups: + - name: taskito + rules: + - alert: HighDLQSize + expr: taskito_dlq_size > 10 + for: 5m + labels: + severity: warning + annotations: + summary: "taskito dead letter queue has {{ $value }} entries" + - alert: HighErrorRate + expr: rate(taskito_jobs_total{status="failed"}[5m]) > 0.1 + for: 2m + labels: + severity: critical + annotations: + summary: "High task failure rate: {{ $value }} failures/sec" +``` diff --git a/docs-next/content/docs/guides/integrations/sentry.mdx b/docs-next/content/docs/guides/integrations/sentry.mdx new file mode 100644 index 0000000..e553d3a --- /dev/null +++ b/docs-next/content/docs/guides/integrations/sentry.mdx @@ -0,0 +1,136 @@ +--- +title: Sentry Integration +description: "SentryMiddleware — automatic exception capture, scope tags, retry breadcrumbs." +--- + +taskito provides a `SentryMiddleware` that automatically captures task +errors and sets rich context for Sentry. + +## Installation + +```bash +pip install taskito[sentry] +``` + +This installs `sentry-sdk` as a dependency. + +## Setup + +Initialize the Sentry SDK as usual, then add `SentryMiddleware` to your +queue: + +```python +import sentry_sdk +from taskito import Queue +from taskito.contrib.sentry import SentryMiddleware + +sentry_sdk.init(dsn="https://examplePublicKey@o0.ingest.sentry.io/0") + +queue = Queue(db_path="myapp.db", middleware=[SentryMiddleware()]) +``` + +## What it does + +### Scope tags + +Each task execution gets a Sentry scope with the following tags (prefix +customizable via `tag_prefix`): + +| Tag | Value | +|-----|-------| +| `taskito.task_name` | The registered task name | +| `taskito.job_id` | The job ID | +| `taskito.queue` | The queue name | +| `taskito.retry_count` | Current retry attempt | + +### Transaction name + +The Sentry transaction is set to `taskito:` by default. +Customizable via `transaction_name_fn`. + +### Automatic error capture + +When a task raises an exception, `SentryMiddleware` calls +`sentry_sdk.capture_exception()` automatically. The exception appears in +Sentry with all the context tags attached. + +### Retry breadcrumbs + +When a task is retried, a breadcrumb is added with: + +- **Category**: `taskito` (matches `tag_prefix`) +- **Level**: `warning` +- **Message**: `Retrying (attempt ): ` + +This gives you a trail of retry attempts leading up to a final failure. + +## Configuration + +```python +SentryMiddleware( + tag_prefix="myapp", + transaction_name_fn=lambda ctx: f"task-{ctx.task_name}", + task_filter=lambda name: not name.startswith("internal."), + extra_tags_fn=lambda ctx: {"worker.host": socket.gethostname()}, +) +``` + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `tag_prefix` | `str` | `"taskito"` | Prefix for Sentry tag keys and breadcrumb category. | +| `transaction_name_fn` | `Callable[[JobContext], str] \| None` | `None` | Custom transaction name builder. Receives `JobContext`. Defaults to `:`. | +| `task_filter` | `Callable[[str], bool] \| None` | `None` | Predicate on task name. Return `True` to report, `False` to skip. `None` reports all tasks. | +| `extra_tags_fn` | `Callable[[JobContext], dict[str, str]] \| None` | `None` | Returns extra Sentry tags to set. Receives `JobContext`. | + +## Combining with other middleware + +`SentryMiddleware` composes with other observability middleware: + +```python +from taskito.contrib.otel import OpenTelemetryMiddleware +from taskito.contrib.prometheus import PrometheusMiddleware + +queue = Queue( + db_path="myapp.db", + middleware=[ + OpenTelemetryMiddleware(), + PrometheusMiddleware(), + SentryMiddleware(), + ], +) +``` + +See the [Middleware guide](/docs/guides/extensibility/middleware) for more +on combining middleware. + +## Full example + +```python +import sentry_sdk +from taskito import Queue +from taskito.contrib.sentry import SentryMiddleware + +# Initialize Sentry first +sentry_sdk.init( + dsn="https://examplePublicKey@o0.ingest.sentry.io/0", + traces_sample_rate=1.0, +) + +# Create queue with Sentry middleware +queue = Queue(db_path="myapp.db", middleware=[SentryMiddleware()]) + +@queue.task(max_retries=3) +def process_payment(order_id: str, amount: float): + """Process a payment — errors are automatically reported to Sentry.""" + result = payment_gateway.charge(order_id, amount) + if not result.success: + raise PaymentError(f"Payment failed: {result.error}") + return result.transaction_id +``` + +When `process_payment` fails: + +1. The error appears in Sentry with tags `taskito.task_name=myapp.tasks.process_payment`, `taskito.job_id=...`, `taskito.queue=default` +2. If the task retries, each retry is recorded as a breadcrumb +3. The final failure (after all retries) includes the full breadcrumb trail +```