From 846d769113baae40e8aa78d5077b0c7fcbccbc12 Mon Sep 17 00:00:00 2001 From: kurilova Date: Tue, 24 Mar 2026 11:13:17 +0000 Subject: [PATCH 1/2] Adds mqtt message --- framework/python/src/common/logger.py | 9 ++++++ framework/python/src/common/mqtt.py | 30 +++++++++++++++---- framework/python/src/common/tasks.py | 9 ++---- framework/python/src/core/session.py | 16 ++++------ framework/python/src/core/tasks.py | 2 -- framework/python/src/core/testrun.py | 3 -- .../src/net_orc/network_orchestrator.py | 12 ++++---- 7 files changed, 48 insertions(+), 33 deletions(-) diff --git a/framework/python/src/common/logger.py b/framework/python/src/common/logger.py index 972eb2ce5..d1be84cdd 100644 --- a/framework/python/src/common/logger.py +++ b/framework/python/src/common/logger.py @@ -16,6 +16,15 @@ import json import logging import os +from common import mqtt + +class TestrunLogger(logging.Logger): + def ui_info(self, msg, *args, **kwargs): + with mqtt.MQTT(self) as client: + client.send_message(mqtt.MQTTTopic.INFO, {'message': msg}) + self.info(msg, *args, **kwargs) + +logging.setLoggerClass(TestrunLogger) LOGGERS = {} _LOG_FORMAT = '%(asctime)s %(name)-8s %(levelname)-7s %(message)s' diff --git a/framework/python/src/common/mqtt.py b/framework/python/src/common/mqtt.py index b98b4ab1b..bc6fbb399 100644 --- a/framework/python/src/common/mqtt.py +++ b/framework/python/src/common/mqtt.py @@ -16,9 +16,15 @@ import json import typing as t import paho.mqtt.client as mqtt_client -from common import logger +from enum import Enum + +class MQTTTopic(str, Enum): + INFO = "info" + INTERNET_CONNECTION_TOPIC = "events/internet" + NETWORK_ADAPTERS_TOPIC = "events/adapter" + STATUS_TOPIC = "status" + -LOGGER = logger.get_logger("mqtt") WEBSOCKETS_HOST = "localhost" WEBSOCKETS_PORT = 1883 @@ -29,9 +35,20 @@ def __init__(self, message: str) -> None: class MQTT: """ MQTT client class""" - def __init__(self) -> None: + def __init__(self, logger=None) -> None: + self._logger = logger self._host = WEBSOCKETS_HOST self._client = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION2) + self._connect() + + def __enter__(self): + self._connect() + return self + + def __exit__(self, exc_type, exc_value, exc_traceback): + if exc_traceback and self._logger is not None: + self._logger.error(exc_traceback) + self.disconnect() def _connect(self): """Establish connection to MQTT broker""" @@ -39,12 +56,14 @@ def _connect(self): try: self._client.connect(self._host, WEBSOCKETS_PORT, 60) except (ValueError, ConnectionRefusedError): - LOGGER.error("Cannot connect to MQTT broker") + if self._logger is not None: + self._logger.error("Cannot connect to MQTT broker") def disconnect(self): """Disconnect the local client from the MQTT broker""" if self._client.is_connected(): - LOGGER.debug("Disconnecting from broker") + if self._logger is not None: + self._logger.debug("Disconnecting from MQTT broker") self._client.disconnect() def send_message(self, topic: str, message: t.Union[str, dict]) -> None: @@ -54,7 +73,6 @@ def send_message(self, topic: str, message: t.Union[str, dict]) -> None: topic (str): mqtt topic message (t.Union[str, dict]): message """ - self._connect() if isinstance(message, dict): message = json.dumps(message) self._client.publish(topic, str(message)) diff --git a/framework/python/src/common/tasks.py b/framework/python/src/common/tasks.py index 5da0b40c9..b75478e83 100644 --- a/framework/python/src/common/tasks.py +++ b/framework/python/src/common/tasks.py @@ -21,13 +21,12 @@ from fastapi import FastAPI from common import logger +from common.mqtt_topics import MQTTTopic # Check adapters period seconds # Check adapters period seconds CHECK_NETWORK_ADAPTERS_PERIOD = 5 CHECK_INTERNET_PERIOD = 2 -INTERNET_CONNECTION_TOPIC = 'events/internet' -NETWORK_ADAPTERS_TOPIC = 'events/adapter' LOGGER = logger.get_logger('tasks') @@ -48,8 +47,7 @@ def __init__( self.adapters_checker_job = self._scheduler.add_job( func=self._testrun.get_net_orc().network_adapters_checker, kwargs={ - 'mqtt_client': self._mqtt_client, - 'topic': NETWORK_ADAPTERS_TOPIC + 'topic': MQTTTopic.NETWORK_ADAPTERS_TOPIC }, trigger='interval', seconds=CHECK_NETWORK_ADAPTERS_PERIOD, @@ -59,8 +57,7 @@ def __init__( self.internet_shecker = self._scheduler.add_job( func=self._testrun.get_net_orc().internet_conn_checker, kwargs={ - 'mqtt_client': self._mqtt_client, - 'topic': INTERNET_CONNECTION_TOPIC + 'topic': MQTTTopic.INTERNET_CONNECTION_TOPIC }, trigger='interval', seconds=CHECK_INTERNET_PERIOD, diff --git a/framework/python/src/core/session.py b/framework/python/src/core/session.py index 3fc373793..dedf86092 100644 --- a/framework/python/src/core/session.py +++ b/framework/python/src/core/session.py @@ -42,7 +42,6 @@ ALLOW_DISCONNECT_KEY='allow_disconnect' CERTS_PATH = 'local/root_certs' CONFIG_FILE_PATH = 'local/system.json' -STATUS_TOPIC = 'status' MAKE_CONTROL_DIR = 'make/DEBIAN/control' @@ -65,10 +64,11 @@ def wrapper(self, *args, **kwargs): result = method(self, *args, **kwargs) if self.get_status() != TestrunStatus.IDLE and not self.pause_message: - self.get_mqtt_client().send_message( - STATUS_TOPIC, - jsonable_encoder(self.to_json()) - ) + with mqtt.MQTT(LOGGER) as client: + client.send_message( + mqtt.MQTTTopic.STATUS_TOPIC, + jsonable_encoder(self.to_json()) + ) if self.get_status() in STATUSES_COMPLETE: self.pause_message = True @@ -168,9 +168,6 @@ def __init__(self, root_dir): self._timezone = tz[0] LOGGER.debug(f'System timezone is {self._timezone}') - # MQTT client - self._mqtt_client = mqtt.MQTT() - def start(self): self.reset() self._status = TestrunStatus.STARTING @@ -1061,8 +1058,5 @@ def detect_network_adapters_change(self) -> dict: self._ifaces = ifaces_new return adapters - def get_mqtt_client(self): - return self._mqtt_client - def get_ifaces(self): return self._ifaces diff --git a/framework/python/src/core/tasks.py b/framework/python/src/core/tasks.py index 5da0b40c9..466e74b29 100644 --- a/framework/python/src/core/tasks.py +++ b/framework/python/src/core/tasks.py @@ -48,7 +48,6 @@ def __init__( self.adapters_checker_job = self._scheduler.add_job( func=self._testrun.get_net_orc().network_adapters_checker, kwargs={ - 'mqtt_client': self._mqtt_client, 'topic': NETWORK_ADAPTERS_TOPIC }, trigger='interval', @@ -59,7 +58,6 @@ def __init__( self.internet_shecker = self._scheduler.add_job( func=self._testrun.get_net_orc().internet_conn_checker, kwargs={ - 'mqtt_client': self._mqtt_client, 'topic': INTERNET_CONNECTION_TOPIC }, trigger='interval', diff --git a/framework/python/src/core/testrun.py b/framework/python/src/core/testrun.py index 069552320..94a621c11 100644 --- a/framework/python/src/core/testrun.py +++ b/framework/python/src/core/testrun.py @@ -398,9 +398,6 @@ async def stop(self): self.get_session().set_status(TestrunStatus.CANCELLED) - # Disconnect before WS server stops to prevent error - self._mqtt_client.disconnect() - self._stop_network(kill=True) def _register_exits(self): diff --git a/framework/python/src/net_orc/network_orchestrator.py b/framework/python/src/net_orc/network_orchestrator.py index 4acd5f3c3..31bb543d7 100644 --- a/framework/python/src/net_orc/network_orchestrator.py +++ b/framework/python/src/net_orc/network_orchestrator.py @@ -697,14 +697,15 @@ def restore_net(self): def get_session(self): return self._session - def network_adapters_checker(self, mqtt_client: mqtt.MQTT, topic: str): + def network_adapters_checker(self, topic: str): """Checks for changes in network adapters and sends a message to the frontend """ try: adapters = self._session.detect_network_adapters_change() if adapters: - mqtt_client.send_message(topic, adapters) + with mqtt.MQTT(LOGGER) as client: + client.send_message(topic, adapters) except Exception: # pylint: disable=W0703 LOGGER.error(traceback.format_exc()) @@ -713,7 +714,7 @@ def is_device_connected(self): return self._ip_ctrl.check_interface_status( self._session.get_device_interface()) - def internet_conn_checker(self, mqtt_client: mqtt.MQTT, topic: str): + def internet_conn_checker(self, topic: str): """Checks internet connection and sends a status to frontend""" # Default message @@ -739,8 +740,9 @@ def internet_conn_checker(self, mqtt_client: mqtt.MQTT, topic: str): if internet_connection: message['connection'] = True - # Broadcast via MQTT client - mqtt_client.send_message(topic, message) + with mqtt.MQTT(LOGGER) as client: + # Broadcast via MQTT client + client.send_message(topic, message) class NetworkConfig: From 039072ec9d4bcc2d95f61986d3c8c3136847ed18 Mon Sep 17 00:00:00 2001 From: kurilova Date: Thu, 26 Mar 2026 11:19:49 +0000 Subject: [PATCH 2/2] Adds disconnect --- framework/python/src/common/tasks.py | 7 +++---- framework/python/src/core/testrun.py | 3 +++ 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/framework/python/src/common/tasks.py b/framework/python/src/common/tasks.py index b75478e83..544bba663 100644 --- a/framework/python/src/common/tasks.py +++ b/framework/python/src/common/tasks.py @@ -20,8 +20,7 @@ from apscheduler.schedulers.asyncio import AsyncIOScheduler from fastapi import FastAPI -from common import logger -from common.mqtt_topics import MQTTTopic +from common import logger, mqtt # Check adapters period seconds # Check adapters period seconds @@ -47,7 +46,7 @@ def __init__( self.adapters_checker_job = self._scheduler.add_job( func=self._testrun.get_net_orc().network_adapters_checker, kwargs={ - 'topic': MQTTTopic.NETWORK_ADAPTERS_TOPIC + 'topic': mqtt.MQTTTopic.NETWORK_ADAPTERS_TOPIC }, trigger='interval', seconds=CHECK_NETWORK_ADAPTERS_PERIOD, @@ -57,7 +56,7 @@ def __init__( self.internet_shecker = self._scheduler.add_job( func=self._testrun.get_net_orc().internet_conn_checker, kwargs={ - 'topic': MQTTTopic.INTERNET_CONNECTION_TOPIC + 'topic': mqtt.MQTTTopic.INTERNET_CONNECTION_TOPIC }, trigger='interval', seconds=CHECK_INTERNET_PERIOD, diff --git a/framework/python/src/core/testrun.py b/framework/python/src/core/testrun.py index 94a621c11..069552320 100644 --- a/framework/python/src/core/testrun.py +++ b/framework/python/src/core/testrun.py @@ -398,6 +398,9 @@ async def stop(self): self.get_session().set_status(TestrunStatus.CANCELLED) + # Disconnect before WS server stops to prevent error + self._mqtt_client.disconnect() + self._stop_network(kill=True) def _register_exits(self):