Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 159 additions & 0 deletions docs-next/content/docs/guides/integrations/django.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
---
title: Django Integration
description: "Admin views, settings, autodiscovery, management commands — no Django ORM models required."
---

import { Callout } from "fumadocs-ui/components/callout";

taskito provides Django admin views for browsing jobs, inspecting dead
letters, and viewing queue statistics — all without Django ORM models.

## Installation

```bash
pip install taskito[django]
```

## Setup

### Option 1: register on the default admin site

In your project's `admin.py` or `urls.py`:

```python
from taskito.contrib.django.admin import register_taskito_admin

register_taskito_admin()
```

This adds taskito views to the default `admin.site`.

### Option 2: custom admin site

Use `TaskitoAdminSite` for a dedicated admin:

```python
from taskito.contrib.django.admin import TaskitoAdminSite

admin_site = TaskitoAdminSite(name="taskito_admin")
```

## Django settings

The following settings can be defined in your Django `settings.py`:

| Setting | Default | Description |
|---------|---------|-------------|
| `TASKITO_AUTODISCOVER_MODULE` | `"tasks"` | Module name auto-discovered in each installed app on startup. |
| `TASKITO_ADMIN_PER_PAGE` | `50` | Rows per page in the admin jobs and dead letters views. |
| `TASKITO_ADMIN_TITLE` | `"Taskito"` | Browser tab title for `TaskitoAdminSite`. |
| `TASKITO_ADMIN_HEADER` | `"Taskito Admin"` | Site header shown in `TaskitoAdminSite`. |
| `TASKITO_WATCH_INTERVAL` | `2` | Polling interval in seconds for `manage.py taskito_info --watch`. |
| `TASKITO_DASHBOARD_HOST` | `"127.0.0.1"` | Default bind host for `manage.py taskito_dashboard`. |
| `TASKITO_DASHBOARD_PORT` | `8080` | Default bind port for `manage.py taskito_dashboard`. |

Example:

```python
# settings.py
TASKITO_AUTODISCOVER_MODULE = "jobs" # import myapp.jobs instead of myapp.tasks
TASKITO_ADMIN_PER_PAGE = 25
TASKITO_ADMIN_TITLE = "MyApp Tasks"
TASKITO_ADMIN_HEADER = "MyApp Task Queue"
TASKITO_DASHBOARD_HOST = "0.0.0.0"
TASKITO_DASHBOARD_PORT = 9000
```

## Queue configuration

Create a `taskito` queue instance in your Django project. The
`get_queue()` function in `taskito.contrib.django.settings` is used to
retrieve the queue instance.

```python
# myproject/tasks.py
from taskito import Queue

queue = Queue(db_path="taskito.db")

@queue.task()
def send_welcome_email(user_id: int):
from myapp.models import User
user = User.objects.get(id=user_id)
user.email_user("Welcome!", "Thanks for signing up.")
```

<Callout type="info" title="Lazy imports">
Import Django models inside the task function body to avoid app registry
issues during startup.
</Callout>

## Admin views

The integration provides the following views under `/admin/taskito/`:

- **Dashboard** — Queue statistics overview (pending, running, completed, failed, dead, cancelled)
- **Jobs** — Paginated job list with status, queue, and task name filters
- **Job Detail** — Full job payload, error history, retry count, and metadata
- **Dead Letters** — Browse and retry dead letter entries

## Running the worker

```bash
DJANGO_SETTINGS_MODULE=myproject.settings taskito worker --app myproject.tasks:queue
```

## Full example

### Project structure

```
myproject/
settings.py
urls.py
tasks.py # Queue + task definitions
myapp/
admin.py # Register taskito admin views
views.py
models.py
```

### `tasks.py`

```python
from taskito import Queue

queue = Queue(db_path="taskito.db")

@queue.task(max_retries=3)
def send_welcome_email(user_id: int):
from myapp.models import User
user = User.objects.get(id=user_id)
user.email_user("Welcome!", "Thanks for signing up.")

@queue.task(rate_limit="60/h")
def generate_monthly_report(month: int, year: int):
from myapp.reports import build_report
return build_report(month, year)
```

### `myapp/admin.py`

```python
from django.contrib import admin
from taskito.contrib.django.admin import register_taskito_admin

register_taskito_admin()
```

### `myapp/views.py`

```python
from django.http import JsonResponse
from myproject.tasks import send_welcome_email

def signup_view(request):
user = create_user(request.POST)
send_welcome_email.delay(user.id)
return JsonResponse({"status": "ok"})
```
207 changes: 207 additions & 0 deletions docs-next/content/docs/guides/integrations/fastapi.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
---
title: FastAPI Integration
description: "TaskitoRouter — pre-built APIRouter with job status, SSE progress, dead letter management."
---

taskito provides a pre-built `APIRouter` for FastAPI with endpoints for
job management, progress streaming via SSE, and dead letter queue
operations.

## Installation

```bash
pip install taskito[fastapi]
```

This installs `fastapi` and `pydantic` as extras.

## Quick setup

```python
from fastapi import FastAPI
from taskito import Queue
from taskito.contrib.fastapi import TaskitoRouter

queue = Queue(db_path="myapp.db")

@queue.task()
def process_data(payload: dict) -> str:
return "done"

app = FastAPI()
app.include_router(TaskitoRouter(queue), prefix="/tasks")
```

Run with:

```bash
uvicorn myapp:app --reload
```

## Endpoints

| Method | Path | Description |
|--------|------|-------------|
| `GET` | `/stats` | Queue statistics |
| `GET` | `/stats/queues` | Per-queue statistics |
| `GET` | `/jobs/{job_id}` | Job status, progress, and metadata |
| `GET` | `/jobs/{job_id}/errors` | Error history for a job |
| `GET` | `/jobs/{job_id}/result` | Job result (optional `?timeout=N` for blocking) |
| `GET` | `/jobs/{job_id}/progress` | SSE stream of progress updates |
| `POST` | `/jobs/{job_id}/cancel` | Cancel a pending job |
| `GET` | `/dead-letters` | List dead letter entries (paginated) |
| `POST` | `/dead-letters/{dead_id}/retry` | Re-enqueue a dead letter |
| `GET` | `/health` | Liveness check |
| `GET` | `/readiness` | Readiness check |
| `GET` | `/resources` | Worker resource status |

## Configuration

`TaskitoRouter` accepts options to control which routes are registered,
how results are serialized, and page sizes:

```python
from fastapi import Depends, HTTPException
from taskito.contrib.fastapi import TaskitoRouter

def require_api_key(x_api_key: str = Header(...)):
if x_api_key != "secret":
raise HTTPException(status_code=403)

app.include_router(
TaskitoRouter(
queue,
include_routes={"stats", "jobs", "dead-letters", "retry-dead"},
dependencies=[Depends(require_api_key)],
sse_poll_interval=1.0,
result_timeout=5.0,
default_page_size=25,
max_page_size=200,
result_serializer=lambda v: v if isinstance(v, (str, int, float, bool, None)) else str(v),
),
prefix="/tasks",
)
```

| Parameter | Type | Default | Description |
|---|---|---|---|
| `include_routes` | `set[str] \| None` | `None` | If set, only register these route names. Cannot be combined with `exclude_routes`. |
| `exclude_routes` | `set[str] \| None` | `None` | If set, skip these route names. Cannot be combined with `include_routes`. |
| `dependencies` | `Sequence[Depends] \| None` | `None` | FastAPI dependencies applied to every route (e.g. auth). |
| `sse_poll_interval` | `float` | `0.5` | Seconds between SSE progress polls. |
| `result_timeout` | `float` | `1.0` | Default timeout for non-blocking result fetch. |
| `default_page_size` | `int` | `20` | Default page size for paginated endpoints. |
| `max_page_size` | `int` | `100` | Maximum allowed page size. |
| `result_serializer` | `Callable[[Any], Any] \| None` | `None` | Custom result serializer. Receives any value, must return a JSON-serializable value. |

Valid route names: `stats`, `jobs`, `job-errors`, `job-result`,
`job-progress`, `cancel`, `dead-letters`, `retry-dead`, `health`,
`readiness`, `resources`, `queue-stats`.

## Blocking result fetch

The `/jobs/{job_id}/result` endpoint supports an optional `timeout` query
parameter (0–300 seconds). When `timeout > 0`, the request blocks until
the job completes or the timeout elapses:

```bash
# Non-blocking (default)
curl http://localhost:8000/tasks/jobs/01H5K6X.../result

# Block up to 30 seconds for the result
curl http://localhost:8000/tasks/jobs/01H5K6X.../result?timeout=30
```

## SSE progress streaming

Stream real-time progress for a running job using Server-Sent Events:

```python
import httpx

with httpx.stream("GET", "http://localhost:8000/tasks/jobs/01H5K6X.../progress") as r:
for line in r.iter_lines():
print(line)
# data: {"progress": 25, "status": "running"}
# data: {"progress": 50, "status": "running"}
# data: {"progress": 100, "status": "completed"}
```

From the browser:

```javascript
const source = new EventSource("/tasks/jobs/01H5K6X.../progress");
source.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log(`Progress: ${data.progress}%`);
if (data.status === "completed" || data.status === "failed") {
source.close();
}
};
```

The stream sends a JSON event every 0.5 seconds while the job is active,
then a final event when the job reaches a terminal state.

## Pydantic response models

All endpoints return validated Pydantic models with clean OpenAPI docs.
You can import them for type-safe client code:

```python
from taskito.contrib.fastapi import (
StatsResponse,
JobResponse,
JobErrorResponse,
JobResultResponse,
CancelResponse,
DeadLetterResponse,
RetryResponse,
)
```

## Full example

```python
from fastapi import FastAPI, Header, HTTPException, Depends
from taskito import Queue, current_job
from taskito.contrib.fastapi import TaskitoRouter

queue = Queue(db_path="myapp.db")

@queue.task()
def resize_image(image_url: str, sizes: list[int]) -> dict:
results = {}
for i, size in enumerate(sizes):
results[size] = do_resize(image_url, size)
current_job.update_progress(int((i + 1) / len(sizes) * 100))
return results

async def require_auth(authorization: str = Header(...)):
if not authorization.startswith("Bearer "):
raise HTTPException(401)

app = FastAPI(title="Image Service")
app.include_router(
TaskitoRouter(queue, dependencies=[Depends(require_auth)]),
prefix="/tasks",
tags=["tasks"],
)

# Start worker in a separate process:
# taskito worker --app myapp:queue
```

```bash
# Check job status
curl http://localhost:8000/tasks/jobs/01H5K6X... \
-H "Authorization: Bearer mytoken"

# Stream progress
curl -N http://localhost:8000/tasks/jobs/01H5K6X.../progress \
-H "Authorization: Bearer mytoken"

# Block for result (up to 60s)
curl http://localhost:8000/tasks/jobs/01H5K6X.../result?timeout=60 \
-H "Authorization: Bearer mytoken"
```
Loading
Loading