Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
GITHUB_CONTEXT: ${{ toJson(github) }}
run: echo "$GITHUB_CONTEXT"

- uses: actions/checkout@v5
- uses: actions/checkout@v6

- name: Set up Python
uses: actions/setup-python@v6
Expand Down
14 changes: 7 additions & 7 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
if: github.event.pull_request.draft == false
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v5
- uses: actions/checkout@v6
- uses: actions/setup-python@v6
with:
python-version: 3.13
Expand All @@ -41,11 +41,11 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.10", "3.11", "3.12", "3.13"]
python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"]
fail-fast: false

steps:
- uses: actions/checkout@v5
- uses: actions/checkout@v6
- name: Set up Python
uses: actions/setup-python@v6
with:
Expand All @@ -64,7 +64,7 @@ jobs:
COVERAGE_FILE: coverage/.coverage.${{ runner.os }}-py${{ matrix.python-version }}-${{ matrix.pydantic-version }}
CONTEXT: ${{ runner.os }}-py${{ matrix.python-version }}-${{ matrix.pydantic-version }}
- name: Store coverage files
uses: actions/upload-artifact@v4
uses: actions/upload-artifact@v6
with:
name: .coverage.${{ runner.os }}-py${{ matrix.python-version }}-${{ matrix.pydantic-version }}
path: coverage
Expand All @@ -76,7 +76,7 @@ jobs:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v5
- uses: actions/checkout@v6

- uses: actions/setup-python@v6
with:
Expand All @@ -87,7 +87,7 @@ jobs:
version: "latest"

- name: Get coverage files
uses: actions/download-artifact@v5
uses: actions/download-artifact@v7
with:
pattern: .coverage*
path: coverage
Expand All @@ -101,7 +101,7 @@ jobs:
- run: coverage html --show-contexts --title "taskiq-faststream coverage for ${{ github.sha }}"

- name: Store coverage html
uses: actions/upload-artifact@v4
uses: actions/upload-artifact@v6
with:
name: coverage-html
path: htmlcov
Expand Down
42 changes: 42 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ pip install taskiq-faststream[nats]
pip install taskiq-faststream[redis]
```

For **OpenTelemetry** distributed tracing support:

```bash
pip install taskiq-faststream[otel]
```

## Usage

The package gives you two classes: `AppWrapper` and `BrokerWrapper`
Expand Down Expand Up @@ -141,3 +147,39 @@ taskiq_broker.task(
...,
)
```

## OpenTelemetry Support

**taskiq-faststream** supports taskiq's OpenTelemetry middleware. To enable it, pass `OpenTelemetryMiddleware` when creating the broker wrapper:

```python
from faststream.nats import NatsBroker
from taskiq_faststream import BrokerWrapper
from taskiq.middlewares.otel_middleware import OpenTelemetryMiddleware

broker = NatsBroker()

# Enable OpenTelemetry middleware
taskiq_broker = BrokerWrapper(broker, middlewares=[OpenTelemetryMiddleware()])
```

This will automatically add OpenTelemetry middleware to track task execution, providing insights into:
- Task execution spans
- Task dependencies and call chains
- Performance metrics
- Error tracking

Make sure to configure your OpenTelemetry exporter (e.g., Jaeger, Zipkin) according to your monitoring setup.

The same applies to `AppWrapper`:

```python
from faststream import FastStream
from taskiq_faststream import AppWrapper
from taskiq.middlewares.otel_middleware import OpenTelemetryMiddleware

app = FastStream(broker)

# Enable OpenTelemetry middleware
taskiq_broker = AppWrapper(app, middlewares=[OpenTelemetryMiddleware()])
```
18 changes: 11 additions & 7 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
[project]
name = "taskiq-faststream"
version = "0.3.2"
version = "0.4.0"
description = "FastStream - taskiq integration to schedule FastStream tasks"
readme = "README.md"
license = "MIT"
license-files = ["LICENSE"]
authors = [
{ name = "Taskiq team", email = "taskiq@no-reply.com" },
{ name = "Nikita Pastukhov", email = "nikita@pastukhov-dev.com" },
{ name = "Nikita Pastukhov", email = "nikita@pastukhov-dev.ru" },
]

keywords = ["taskiq", "tasks", "distributed", "async", "FastStream"]
Expand Down Expand Up @@ -40,7 +40,7 @@ classifiers = [


dependencies = [
"taskiq>=0.11.0,<0.12.0",
"taskiq>=0.12.1,<0.13.0",
"faststream>=0.3.14,<0.7",
]

Expand All @@ -65,23 +65,27 @@ redis = [
"faststream[redis]"
]

otel = [
"taskiq[opentelemetry]"
]

[dependency-groups]
test = [
"taskiq-faststream[nats]",
"taskiq-faststream[rabbit]",
"taskiq-faststream[kafka]",
"taskiq-faststream[confluent]",
"taskiq-faststream[redis]",

"taskiq-faststream[otel]",
"coverage[toml]>=7.2.0,<8.0.0",
"pytest>=7.4.0,<9",
"pytest>=7.4.0,<10",
"freezegun>=1.2.2"
]

dev = [
{include-group = "test"},
"mypy==1.18.2",
"ruff==0.14.0",
"mypy==1.19.1",
"ruff==0.14.10",
"pre-commit >=3.6.0,<5.0.0",
]

Expand Down
30 changes: 28 additions & 2 deletions taskiq_faststream/broker.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import typing
import warnings
from collections.abc import Iterable
from typing import Any, TypeAlias

import anyio
from faststream._internal.application import Application
from faststream.types import SendableMessage
from taskiq import AsyncBroker
from taskiq.abc.middleware import TaskiqMiddleware
from taskiq.acks import AckableMessage
from taskiq.decor import AsyncTaskiqDecoratedTask

Expand All @@ -30,10 +32,22 @@ class BrokerWrapper(AsyncBroker):
task : Register FastStream scheduled task.
"""

def __init__(self, broker: Any) -> None:
def __init__(
self,
broker: Any,
*,
middlewares: Iterable[TaskiqMiddleware] = (),
) -> None:
"""Initialize BrokerWrapper.

Args:
broker: FastStream broker instance to wrap.
middlewares: Middlewares to add to the broker.
"""
super().__init__()
self.formatter = PatchedFormatter()
self.broker = broker
self.add_middlewares(*middlewares)

async def startup(self) -> None:
"""Startup wrapped FastStream broker."""
Expand Down Expand Up @@ -105,10 +119,22 @@ class AppWrapper(BrokerWrapper):
task : Register FastStream scheduled task.
"""

def __init__(self, app: Application) -> None:
def __init__(
self,
app: Application,
*,
middlewares: Iterable[TaskiqMiddleware] = (),
) -> None:
"""Initialize AppWrapper.

Args:
app: FastStream application instance to wrap.
middlewares: Middlewares to add to the broker.
"""
super(BrokerWrapper, self).__init__()
self.formatter = PatchedFormatter()
self.app = app
self.add_middlewares(*middlewares)

async def startup(self) -> None:
"""Startup wrapped FastStream."""
Expand Down