Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/instana/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ def boot_agent() -> None:

# Import & initialize instrumentation
from instana.instrumentation import (
aioamqp, # noqa: F401
asyncio, # noqa: F401
boto3_inst, # noqa: F401
cassandra_inst, # noqa: F401
Expand Down Expand Up @@ -208,6 +209,7 @@ def boot_agent() -> None:
client as tornado_client, # noqa: F401
)
from instana.instrumentation.tornado import (
client as tornado_client, # noqa: F401
server as tornado_server, # noqa: F401
)

Expand Down
78 changes: 78 additions & 0 deletions src/instana/instrumentation/aioamqp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# (c) Copyright IBM Corp. 2025

try:
import aioamqp
from typing import Any, Callable, Dict, Tuple

import wrapt
from opentelemetry.trace.status import StatusCode

from instana.log import logger
from instana.util.traceutils import get_tracer_tuple, tracing_is_off

@wrapt.patch_function_wrapper("aioamqp.channel", "Channel.basic_publish")
async def basic_publish_with_instana(
wrapped: Callable[..., aioamqp.connect],
instance: object,
argv: Tuple[object, Tuple[object, ...]],
kwargs: Dict[str, Any],
) -> object:
if tracing_is_off():
return await wrapped(*argv, **kwargs)

tracer, parent_span, _ = get_tracer_tuple()
parent_context = parent_span.get_span_context() if parent_span else None
with tracer.start_as_current_span(
"aioamqp-publisher", span_context=parent_context
) as span:
try:
span.set_attribute("aioamqp.exchange", argv[0])
return await wrapped(*argv, **kwargs)
except Exception as exc:
span.record_exception(exc)
logger.debug(f"aioamqp basic_publish_with_instana error: {exc}")

@wrapt.patch_function_wrapper("aioamqp.channel", "Channel.basic_consume")
def basic_consume_with_instana(
wrapped: Callable[..., aioamqp.connect],
instance: object,
argv: Tuple[object, Tuple[object, ...]],
kwargs: Dict[str, Any],
) -> object:
if tracing_is_off():
return wrapped(*argv, **kwargs)

callback = argv[0]
tracer, parent_span, _ = get_tracer_tuple()
parent_context = parent_span.get_span_context() if parent_span else None

@wrapt.decorator
async def callback_wrapper(
wrapped_callback: Callable[..., aioamqp.connect],
instance: Any,
args: Tuple,
kwargs: Dict,
) -> object:
with tracer.start_as_current_span(
"aioamqp-consumer", span_context=parent_context
) as span:
try:
span.set_status(StatusCode.OK)
span.set_attribute("aioamqp.callback", callback)
span.set_attribute("aioamqp.message", args[1])
span.set_attribute("aioamqp.exchange_name", args[2].exchange_name)
span.set_attribute("aioamqp.routing_key", args[2].routing_key)
return await wrapped_callback(*args, **kwargs)
except Exception as exc:
span.record_exception(exc)
logger.debug(f"aioamqp basic_consume_with_instana error: {exc}")

wrapped_callback = callback_wrapper(callback)
argv = (wrapped_callback,) + argv[1:]

return wrapped(*argv, **kwargs)

logger.debug("Instrumenting aioamqp")

except ImportError:
pass
131 changes: 131 additions & 0 deletions tests/frameworks/test_aioamqp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import asyncio
from typing import Any, Generator

import aioamqp
import pytest

from instana.singletons import tracer
from tests.helpers import testenv
from aioamqp.properties import Properties
from aioamqp.envelope import Envelope

testenv["rabbitmq_host"] = "127.0.0.1"
testenv["rabbitmq_port"] = 5672


class TestAioamqp:
@pytest.fixture(autouse=True)
def _resource(self) -> Generator[None, None, None]:
self.recorder = tracer.span_processor
self.recorder.clear_spans()

self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(None)
yield
self.loop.run_until_complete(self.delete_queue())
if self.loop.is_running():
self.loop.close()

async def delete_queue(self) -> None:
transport, protocol = await aioamqp.connect(
testenv["rabbitmq_host"],
testenv["rabbitmq_port"],
)
channel = await protocol.channel()
await channel.queue_delete("message_queue")
await asyncio.sleep(1)

async def publish_message(self) -> None:
transport, protocol = await aioamqp.connect(
testenv["rabbitmq_host"],
testenv["rabbitmq_port"],
)
channel = await protocol.channel()

await channel.queue_declare(queue_name="message_queue")

message = "Instana test message"
await channel.basic_publish(
message.encode(), exchange_name="", routing_key="message_queue"
)

await protocol.close()
transport.close()

async def consume_message(self) -> None:
async def callback(
channel: Any,
body: bytes,
envelope: Envelope,
properties: Properties,
) -> None:
with tracer.start_as_current_span("callback-span"):
await channel.basic_client_ack(delivery_tag=envelope.delivery_tag)

_, protocol = await aioamqp.connect(
testenv["rabbitmq_host"], testenv["rabbitmq_port"]
)
channel = await protocol.channel()
await channel.queue_declare(queue_name="message_queue")
await channel.basic_consume(callback, queue_name="message_queue", no_ack=False)

def test_basic_publish(self) -> None:
with tracer.start_as_current_span("test-span"):
self.loop.run_until_complete(self.publish_message())

spans = self.recorder.queued_spans()

assert len(spans) == 2
publisher_span = spans[0]
test_span = spans[1]

assert publisher_span.n == "sdk"
assert publisher_span.data["sdk"]["name"] == "aioamqp-publisher"
assert publisher_span.p == test_span.s

assert test_span.n == "sdk"
assert not test_span.p

def test_basic_consumer(self) -> None:
with tracer.start_as_current_span("test-span"):
self.loop.run_until_complete(self.publish_message())
self.loop.run_until_complete(self.consume_message())

spans = self.recorder.queued_spans()

assert len(spans) == 4

publisher_span = spans[0]
callback_span = spans[1]
consumer_span = spans[2]
test_span = spans[3]

assert publisher_span.n == "sdk"
assert publisher_span.data["sdk"]["name"] == "aioamqp-publisher"
assert publisher_span.p == test_span.s
assert (
publisher_span.data["sdk"]["custom"]["tags"]["aioamqp.exchange"]
== "b'Instana test message'"
)

assert callback_span.n == "sdk"
assert callback_span.data["sdk"]["name"] == "callback-span"
assert callback_span.data["sdk"]["type"] == "intermediate"
assert callback_span.p == consumer_span.s

assert consumer_span.n == "sdk"
assert consumer_span.data["sdk"]["name"] == "aioamqp-consumer"
assert consumer_span.data["sdk"]["custom"]["tags"]["aioamqp.callback"]
assert (
consumer_span.data["sdk"]["custom"]["tags"]["aioamqp.message"]
== "b'Instana test message'"
)
assert (
consumer_span.data["sdk"]["custom"]["tags"]["aioamqp.routing_key"]
== "message_queue"
)
assert not consumer_span.data["sdk"]["custom"]["tags"]["exchange_name"]
assert consumer_span.p == test_span.s

assert test_span.n == "sdk"
assert test_span.data["sdk"]["name"] == "test-span"
6 changes: 6 additions & 0 deletions tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@
testenv["mongodb_user"] = os.environ.get("MONGO_USER", None)
testenv["mongodb_pw"] = os.environ.get("MONGO_PW", None)

"""
RabbitMQ Environment
"""
testenv["rabbitmq_host"] = os.environ.get("RABBITMQ_HOST", "127.0.0.1")
testenv["rabbitmq_port"] = os.environ.get("RABBITMQ_PORT", 5672)


"""
Kafka Environment
Expand Down
1 change: 1 addition & 0 deletions tests/requirements-pre314.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
aioamqp>=0.15.0
aiofiles>=0.5.0
aiohttp>=3.8.3
boto3>=1.17.74
Expand Down
1 change: 1 addition & 0 deletions tests/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
aioamqp>=0.15.0
aiofiles>=0.5.0
aiohttp>=3.8.3
boto3>=1.17.74
Expand Down
Loading