diff --git a/src/instana/__init__.py b/src/instana/__init__.py index 43f27465..046ddccc 100644 --- a/src/instana/__init__.py +++ b/src/instana/__init__.py @@ -166,6 +166,7 @@ def boot_agent() -> None: # Import & initialize instrumentation from instana.instrumentation import ( + aioamqp, # noqa: F401 asyncio, # noqa: F401 boto3_inst, # noqa: F401 cassandra_inst, # noqa: F401 @@ -208,6 +209,7 @@ def boot_agent() -> None: client as tornado_client, # noqa: F401 ) from instana.instrumentation.tornado import ( + client as tornado_client, # noqa: F401 server as tornado_server, # noqa: F401 ) diff --git a/src/instana/instrumentation/aioamqp.py b/src/instana/instrumentation/aioamqp.py new file mode 100644 index 00000000..b7ca7e7a --- /dev/null +++ b/src/instana/instrumentation/aioamqp.py @@ -0,0 +1,78 @@ +# (c) Copyright IBM Corp. 2025 + +try: + import aioamqp + from typing import Any, Callable, Dict, Tuple + + import wrapt + from opentelemetry.trace.status import StatusCode + + from instana.log import logger + from instana.util.traceutils import get_tracer_tuple, tracing_is_off + + @wrapt.patch_function_wrapper("aioamqp.channel", "Channel.basic_publish") + async def basic_publish_with_instana( + wrapped: Callable[..., aioamqp.connect], + instance: object, + argv: Tuple[object, Tuple[object, ...]], + kwargs: Dict[str, Any], + ) -> object: + if tracing_is_off(): + return await wrapped(*argv, **kwargs) + + tracer, parent_span, _ = get_tracer_tuple() + parent_context = parent_span.get_span_context() if parent_span else None + with tracer.start_as_current_span( + "aioamqp-publisher", span_context=parent_context + ) as span: + try: + span.set_attribute("aioamqp.exchange", argv[0]) + return await wrapped(*argv, **kwargs) + except Exception as exc: + span.record_exception(exc) + logger.debug(f"aioamqp basic_publish_with_instana error: {exc}") + + @wrapt.patch_function_wrapper("aioamqp.channel", "Channel.basic_consume") + def basic_consume_with_instana( + wrapped: Callable[..., aioamqp.connect], + instance: object, + argv: Tuple[object, Tuple[object, ...]], + kwargs: Dict[str, Any], + ) -> object: + if tracing_is_off(): + return wrapped(*argv, **kwargs) + + callback = argv[0] + tracer, parent_span, _ = get_tracer_tuple() + parent_context = parent_span.get_span_context() if parent_span else None + + @wrapt.decorator + async def callback_wrapper( + wrapped_callback: Callable[..., aioamqp.connect], + instance: Any, + args: Tuple, + kwargs: Dict, + ) -> object: + with tracer.start_as_current_span( + "aioamqp-consumer", span_context=parent_context + ) as span: + try: + span.set_status(StatusCode.OK) + span.set_attribute("aioamqp.callback", callback) + span.set_attribute("aioamqp.message", args[1]) + span.set_attribute("aioamqp.exchange_name", args[2].exchange_name) + span.set_attribute("aioamqp.routing_key", args[2].routing_key) + return await wrapped_callback(*args, **kwargs) + except Exception as exc: + span.record_exception(exc) + logger.debug(f"aioamqp basic_consume_with_instana error: {exc}") + + wrapped_callback = callback_wrapper(callback) + argv = (wrapped_callback,) + argv[1:] + + return wrapped(*argv, **kwargs) + + logger.debug("Instrumenting aioamqp") + +except ImportError: + pass diff --git a/tests/frameworks/test_aioamqp.py b/tests/frameworks/test_aioamqp.py new file mode 100644 index 00000000..aa2deb78 --- /dev/null +++ b/tests/frameworks/test_aioamqp.py @@ -0,0 +1,131 @@ +import asyncio +from typing import Any, Generator + +import aioamqp +import pytest + +from instana.singletons import tracer +from tests.helpers import testenv +from aioamqp.properties import Properties +from aioamqp.envelope import Envelope + +testenv["rabbitmq_host"] = "127.0.0.1" +testenv["rabbitmq_port"] = 5672 + + +class TestAioamqp: + @pytest.fixture(autouse=True) + def _resource(self) -> Generator[None, None, None]: + self.recorder = tracer.span_processor + self.recorder.clear_spans() + + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(None) + yield + self.loop.run_until_complete(self.delete_queue()) + if self.loop.is_running(): + self.loop.close() + + async def delete_queue(self) -> None: + transport, protocol = await aioamqp.connect( + testenv["rabbitmq_host"], + testenv["rabbitmq_port"], + ) + channel = await protocol.channel() + await channel.queue_delete("message_queue") + await asyncio.sleep(1) + + async def publish_message(self) -> None: + transport, protocol = await aioamqp.connect( + testenv["rabbitmq_host"], + testenv["rabbitmq_port"], + ) + channel = await protocol.channel() + + await channel.queue_declare(queue_name="message_queue") + + message = "Instana test message" + await channel.basic_publish( + message.encode(), exchange_name="", routing_key="message_queue" + ) + + await protocol.close() + transport.close() + + async def consume_message(self) -> None: + async def callback( + channel: Any, + body: bytes, + envelope: Envelope, + properties: Properties, + ) -> None: + with tracer.start_as_current_span("callback-span"): + await channel.basic_client_ack(delivery_tag=envelope.delivery_tag) + + _, protocol = await aioamqp.connect( + testenv["rabbitmq_host"], testenv["rabbitmq_port"] + ) + channel = await protocol.channel() + await channel.queue_declare(queue_name="message_queue") + await channel.basic_consume(callback, queue_name="message_queue", no_ack=False) + + def test_basic_publish(self) -> None: + with tracer.start_as_current_span("test-span"): + self.loop.run_until_complete(self.publish_message()) + + spans = self.recorder.queued_spans() + + assert len(spans) == 2 + publisher_span = spans[0] + test_span = spans[1] + + assert publisher_span.n == "sdk" + assert publisher_span.data["sdk"]["name"] == "aioamqp-publisher" + assert publisher_span.p == test_span.s + + assert test_span.n == "sdk" + assert not test_span.p + + def test_basic_consumer(self) -> None: + with tracer.start_as_current_span("test-span"): + self.loop.run_until_complete(self.publish_message()) + self.loop.run_until_complete(self.consume_message()) + + spans = self.recorder.queued_spans() + + assert len(spans) == 4 + + publisher_span = spans[0] + callback_span = spans[1] + consumer_span = spans[2] + test_span = spans[3] + + assert publisher_span.n == "sdk" + assert publisher_span.data["sdk"]["name"] == "aioamqp-publisher" + assert publisher_span.p == test_span.s + assert ( + publisher_span.data["sdk"]["custom"]["tags"]["aioamqp.exchange"] + == "b'Instana test message'" + ) + + assert callback_span.n == "sdk" + assert callback_span.data["sdk"]["name"] == "callback-span" + assert callback_span.data["sdk"]["type"] == "intermediate" + assert callback_span.p == consumer_span.s + + assert consumer_span.n == "sdk" + assert consumer_span.data["sdk"]["name"] == "aioamqp-consumer" + assert consumer_span.data["sdk"]["custom"]["tags"]["aioamqp.callback"] + assert ( + consumer_span.data["sdk"]["custom"]["tags"]["aioamqp.message"] + == "b'Instana test message'" + ) + assert ( + consumer_span.data["sdk"]["custom"]["tags"]["aioamqp.routing_key"] + == "message_queue" + ) + assert not consumer_span.data["sdk"]["custom"]["tags"]["exchange_name"] + assert consumer_span.p == test_span.s + + assert test_span.n == "sdk" + assert test_span.data["sdk"]["name"] == "test-span" diff --git a/tests/helpers.py b/tests/helpers.py index 7f496814..622875d5 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -57,6 +57,12 @@ testenv["mongodb_user"] = os.environ.get("MONGO_USER", None) testenv["mongodb_pw"] = os.environ.get("MONGO_PW", None) +""" +RabbitMQ Environment +""" +testenv["rabbitmq_host"] = os.environ.get("RABBITMQ_HOST", "127.0.0.1") +testenv["rabbitmq_port"] = os.environ.get("RABBITMQ_PORT", 5672) + """ Kafka Environment diff --git a/tests/requirements-pre314.txt b/tests/requirements-pre314.txt index 686aac11..c57055b7 100644 --- a/tests/requirements-pre314.txt +++ b/tests/requirements-pre314.txt @@ -1,3 +1,4 @@ +aioamqp>=0.15.0 aiofiles>=0.5.0 aiohttp>=3.8.3 boto3>=1.17.74 diff --git a/tests/requirements.txt b/tests/requirements.txt index 5d6eb85e..ad4fd0ed 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -1,3 +1,4 @@ +aioamqp>=0.15.0 aiofiles>=0.5.0 aiohttp>=3.8.3 boto3>=1.17.74