MQTTelemetry is a lightweight library designed to simplify working with MQTT protocols for telemetry data.
This project is licensed under the MIT License. See the LICENSE file for details.
In a FastAPI service, define the telemetry middleware. In this example, all requests and response data is sent as MQTT message body (you may want to filter that...):
"""This module provides middleware for handling MQTT telemetry messages."""
from __future__ import annotations
import asyncio
import time
from functools import lru_cache
from fastapi import Response
from mqttelemetry import MessagePayload, MessageService
@lru_cache
def _get_message_service():
return MessageService(
hostname=<RABBITMQ_HOSTNAME>,
username=<RABBITMQ_USERNAME>,
password=<RABBITMQ_PASSWORD>,
)
async def mqtt_telemetry_middleware(request, call_next):
"""Add telemetry middleware"""
start_time = time.perf_counter_ns()
response = await call_next(request)
try:
body = b""
async for chunk in response.body_iterator:
body += chunk
except Exception as exc:
LOGGER.warning("Could not extract response body: %s", exc)
body = b""
new_response = Response(
content=body,
status_code=response.status_code,
headers=response.headers,
media_type=response.media_type,
)
process_time = time.perf_counter_ns() - start_time
payload = await MessagePayload.from_request_response(
request, new_response, process_time
)
message_json = payload.model_dump_json()
topic = f"api{payload.request_url_path.lower().rstrip('/')}/{payload.request_method.lower()}"
# Offload the blocking send_message call to avoid blocking the event loop.
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None, _get_message_service().send_message, topic, message_json
)
return new_responseIn the service main module, include the mqtt_telemetry_middleware and declare it:
from mqtt_middleware import mqtt_telemetry_middleware
app = FastAPI( ...
[...]
)
# Add MQTT Telemetry middleware
app.middleware("http")(mqtt_telemetry_middleware)