Skip to content

PINK-project/mqttelemetry

 
 

Repository files navigation

MQTTelemetry

MQTTelemetry is a lightweight library designed to simplify working with MQTT protocols for telemetry data.

License

This project is licensed under the MIT License. See the LICENSE file for details.

Usage

In a FastAPI service, define the telemetry middleware. In this example, all requests and response data is sent as MQTT message body (you may want to filter that...):

"""This module provides middleware for handling MQTT telemetry messages."""

from __future__ import annotations

import asyncio
import time
from functools import lru_cache

from fastapi import Response
from mqttelemetry import MessagePayload, MessageService

@lru_cache
def _get_message_service():    
    return MessageService(
        hostname=<RABBITMQ_HOSTNAME>,
        username=<RABBITMQ_USERNAME>,
        password=<RABBITMQ_PASSWORD>,
    )


async def mqtt_telemetry_middleware(request, call_next):
    """Add telemetry middleware"""
    start_time = time.perf_counter_ns()
    response = await call_next(request)

    try:
        body = b""
        async for chunk in response.body_iterator:
            body += chunk
    except Exception as exc:
        LOGGER.warning("Could not extract response body: %s", exc)
        body = b""

    new_response = Response(
        content=body,
        status_code=response.status_code,
        headers=response.headers,
        media_type=response.media_type,
    )
    process_time = time.perf_counter_ns() - start_time
    payload = await MessagePayload.from_request_response(
        request, new_response, process_time
    )
    message_json = payload.model_dump_json()
    topic = f"api{payload.request_url_path.lower().rstrip('/')}/{payload.request_method.lower()}"

    # Offload the blocking send_message call to avoid blocking the event loop.
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(
        None, _get_message_service().send_message, topic, message_json
    )

    return new_response

In the service main module, include the mqtt_telemetry_middleware and declare it:

from mqtt_middleware import mqtt_telemetry_middleware

app = FastAPI( ...
[...]
)
# Add MQTT Telemetry middleware
app.middleware("http")(mqtt_telemetry_middleware)

About

MQTT based telemetry for FastAPI applications

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages

  • Python 100.0%