diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index da0d4c6..a203bed 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -18,7 +18,7 @@ jobs: GITHUB_CONTEXT: ${{ toJson(github) }} run: echo "$GITHUB_CONTEXT" - - uses: actions/checkout@v5 + - uses: actions/checkout@v6 - name: Set up Python uses: actions/setup-python@v6 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index a585fad..97eb930 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -15,7 +15,7 @@ jobs: if: github.event.pull_request.draft == false runs-on: ubuntu-latest steps: - - uses: actions/checkout@v5 + - uses: actions/checkout@v6 - uses: actions/setup-python@v6 with: python-version: 3.13 @@ -41,11 +41,11 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: ["3.10", "3.11", "3.12", "3.13"] + python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"] fail-fast: false steps: - - uses: actions/checkout@v5 + - uses: actions/checkout@v6 - name: Set up Python uses: actions/setup-python@v6 with: @@ -64,7 +64,7 @@ jobs: COVERAGE_FILE: coverage/.coverage.${{ runner.os }}-py${{ matrix.python-version }}-${{ matrix.pydantic-version }} CONTEXT: ${{ runner.os }}-py${{ matrix.python-version }}-${{ matrix.pydantic-version }} - name: Store coverage files - uses: actions/upload-artifact@v4 + uses: actions/upload-artifact@v6 with: name: .coverage.${{ runner.os }}-py${{ matrix.python-version }}-${{ matrix.pydantic-version }} path: coverage @@ -76,7 +76,7 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v5 + - uses: actions/checkout@v6 - uses: actions/setup-python@v6 with: @@ -87,7 +87,7 @@ jobs: version: "latest" - name: Get coverage files - uses: actions/download-artifact@v5 + uses: actions/download-artifact@v7 with: pattern: .coverage* path: coverage @@ -101,7 +101,7 @@ jobs: - run: coverage html --show-contexts --title "taskiq-faststream coverage for ${{ github.sha }}" - name: Store coverage html - uses: actions/upload-artifact@v4 + uses: actions/upload-artifact@v6 with: name: coverage-html path: htmlcov diff --git a/README.md b/README.md index ecf9bc9..e1c52e0 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,12 @@ pip install taskiq-faststream[nats] pip install taskiq-faststream[redis] ``` +For **OpenTelemetry** distributed tracing support: + +```bash +pip install taskiq-faststream[otel] +``` + ## Usage The package gives you two classes: `AppWrapper` and `BrokerWrapper` @@ -141,3 +147,39 @@ taskiq_broker.task( ..., ) ``` + +## OpenTelemetry Support + +**taskiq-faststream** supports taskiq's OpenTelemetry middleware. To enable it, pass `OpenTelemetryMiddleware` when creating the broker wrapper: + +```python +from faststream.nats import NatsBroker +from taskiq_faststream import BrokerWrapper +from taskiq.middlewares.otel_middleware import OpenTelemetryMiddleware + +broker = NatsBroker() + +# Enable OpenTelemetry middleware +taskiq_broker = BrokerWrapper(broker, middlewares=[OpenTelemetryMiddleware()]) +``` + +This will automatically add OpenTelemetry middleware to track task execution, providing insights into: +- Task execution spans +- Task dependencies and call chains +- Performance metrics +- Error tracking + +Make sure to configure your OpenTelemetry exporter (e.g., Jaeger, Zipkin) according to your monitoring setup. + +The same applies to `AppWrapper`: + +```python +from faststream import FastStream +from taskiq_faststream import AppWrapper +from taskiq.middlewares.otel_middleware import OpenTelemetryMiddleware + +app = FastStream(broker) + +# Enable OpenTelemetry middleware +taskiq_broker = AppWrapper(app, middlewares=[OpenTelemetryMiddleware()]) +``` diff --git a/pyproject.toml b/pyproject.toml index b99c4f2..180253f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,13 +1,13 @@ [project] name = "taskiq-faststream" -version = "0.3.2" +version = "0.4.0" description = "FastStream - taskiq integration to schedule FastStream tasks" readme = "README.md" license = "MIT" license-files = ["LICENSE"] authors = [ { name = "Taskiq team", email = "taskiq@no-reply.com" }, - { name = "Nikita Pastukhov", email = "nikita@pastukhov-dev.com" }, + { name = "Nikita Pastukhov", email = "nikita@pastukhov-dev.ru" }, ] keywords = ["taskiq", "tasks", "distributed", "async", "FastStream"] @@ -40,7 +40,7 @@ classifiers = [ dependencies = [ - "taskiq>=0.11.0,<0.12.0", + "taskiq>=0.12.1,<0.13.0", "faststream>=0.3.14,<0.7", ] @@ -65,6 +65,10 @@ redis = [ "faststream[redis]" ] +otel = [ + "taskiq[opentelemetry]" +] + [dependency-groups] test = [ "taskiq-faststream[nats]", @@ -72,16 +76,16 @@ test = [ "taskiq-faststream[kafka]", "taskiq-faststream[confluent]", "taskiq-faststream[redis]", - + "taskiq-faststream[otel]", "coverage[toml]>=7.2.0,<8.0.0", - "pytest>=7.4.0,<9", + "pytest>=7.4.0,<10", "freezegun>=1.2.2" ] dev = [ {include-group = "test"}, - "mypy==1.18.2", - "ruff==0.14.0", + "mypy==1.19.1", + "ruff==0.14.10", "pre-commit >=3.6.0,<5.0.0", ] diff --git a/taskiq_faststream/broker.py b/taskiq_faststream/broker.py index f5a6009..6ed7d97 100644 --- a/taskiq_faststream/broker.py +++ b/taskiq_faststream/broker.py @@ -1,11 +1,13 @@ import typing import warnings +from collections.abc import Iterable from typing import Any, TypeAlias import anyio from faststream._internal.application import Application from faststream.types import SendableMessage from taskiq import AsyncBroker +from taskiq.abc.middleware import TaskiqMiddleware from taskiq.acks import AckableMessage from taskiq.decor import AsyncTaskiqDecoratedTask @@ -30,10 +32,22 @@ class BrokerWrapper(AsyncBroker): task : Register FastStream scheduled task. """ - def __init__(self, broker: Any) -> None: + def __init__( + self, + broker: Any, + *, + middlewares: Iterable[TaskiqMiddleware] = (), + ) -> None: + """Initialize BrokerWrapper. + + Args: + broker: FastStream broker instance to wrap. + middlewares: Middlewares to add to the broker. + """ super().__init__() self.formatter = PatchedFormatter() self.broker = broker + self.add_middlewares(*middlewares) async def startup(self) -> None: """Startup wrapped FastStream broker.""" @@ -105,10 +119,22 @@ class AppWrapper(BrokerWrapper): task : Register FastStream scheduled task. """ - def __init__(self, app: Application) -> None: + def __init__( + self, + app: Application, + *, + middlewares: Iterable[TaskiqMiddleware] = (), + ) -> None: + """Initialize AppWrapper. + + Args: + app: FastStream application instance to wrap. + middlewares: Middlewares to add to the broker. + """ super(BrokerWrapper, self).__init__() self.formatter = PatchedFormatter() self.app = app + self.add_middlewares(*middlewares) async def startup(self) -> None: """Startup wrapped FastStream."""